From f9584b1379f6d269eb2746c4941dcb8965b589fd Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Wed, 14 Aug 2024 11:21:22 -0400 Subject: [PATCH 01/18] Update go.mod changing path to repo for local fork --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 92b568d..b0bf239 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/argus-labs/go-jobqueue +module github.com/ezavada/go-jobqueue go 1.22 From 27423106b2837bb9ceb83e5fbf8d1314d76e8a52 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Thu, 15 Aug 2024 10:41:13 -0400 Subject: [PATCH 02/18] Log worker id, add concurrency test --- jobqueue.go | 12 ++++++------ jobqueue_test.go | 29 +++++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/jobqueue.go b/jobqueue.go index f21c745..5865f40 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -145,7 +145,7 @@ func (jq *JobQueue[T]) worker(id int) { // Worker stops running when the job channel is closed for job := range jq.jobs { - err := jq.processJob(job) + err := jq.processJob(job, id) if err != nil { logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Error processing job") } @@ -155,12 +155,12 @@ func (jq *JobQueue[T]) worker(id int) { } // processJob processes a job and updates its status in the database. -func (jq *JobQueue[T]) processJob(job *job[T]) error { +func (jq *JobQueue[T]) processJob(job *job[T], worker int) error { logger := jq.logger.With().Uint64("jobID", job.ID).Logger() if logger.GetLevel() == zerolog.DebugLevel { - logger.Debug().Interface("jobPayload", job.Payload).Msg("Processing job") + logger.Debug().Interface("jobPayload", job.Payload).Int("worker", worker).Msg("Processing job") } else { - logger.Info().Msg("Processing job") + logger.Info().Int("worker", worker).Msg("Processing job") } if err := job.Process(jq.handler); err != nil { @@ -170,7 +170,7 @@ func (jq *JobQueue[T]) processJob(job *job[T]) error { logger.Info().Msg("Job processed successfully") // Now that we've successfully processed the job, we can remove it from BadgerDB - jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from BadgerDB") + jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from BadgerDB") err := jq.db.Update(func(txn *badger.Txn) error { if err := txn.Delete(job.dbKey()); err != nil { return err @@ -183,7 +183,7 @@ func (jq *JobQueue[T]) processJob(job *job[T]) error { } // Remove the job from the in-memory index - jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from in-memory index") + jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from in-memory index") jq.isJobIDInQueue.Delete(job.ID) return nil diff --git a/jobqueue_test.go b/jobqueue_test.go index 512f59d..5be98e2 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -165,7 +165,7 @@ func TestJobQueue_ProcessJob(t *testing.T) { assert.WithinDuration(t, time.Now(), j.CreatedAt, time.Second) // Process the job - assert.NoError(t, jq.processJob(j)) + assert.NoError(t, jq.processJob(j, 0)) // Check that the job is removed from the in-memory index _, ok := jq.isJobIDInQueue.Load(ids[i]) @@ -207,12 +207,37 @@ func TestJobQueue_Recovery(t *testing.T) { assert.Equal(t, j.Payload, testJob{Msg: "hello"}) // Process the job in recovered job queue - assert.NoError(t, recoveredJq.processJob(j)) + assert.NoError(t, recoveredJq.processJob(j, 0)) // Stop recovered job queue assert.NoError(t, recoveredJq.Stop()) } +func TestJobConcurrency(t *testing.T) { + cleanupBadgerDB(t) + + // create initial job queue + jq, err := New[testJob]("/tmp/badger", "test-job", 5, testJobHandler()) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, jq.Stop()) + cleanupBadgerDB(t) + }) + + // Queue a bunch of jobs, which should be processed concurrently + //ids := make([]uint64, 0) + for i := 0; i < 20; i++ { + j := testJob{Msg: fmt.Sprintf("hello %d", i)} + + _, err := jq.Enqueue(j) + assert.NoError(t, err) + + //ids = append(ids, id) + } + + time.Sleep(time.Second) +} + func readJob(db *badger.DB, id uint64) ([]byte, error) { return readKey(db, fmt.Sprintf("%s%d", jobDBKeyPrefix, id)) } From fc43a9a6b5e2452888f1c39d79f1fd4a54762a31 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Thu, 15 Aug 2024 10:49:52 -0400 Subject: [PATCH 03/18] make concurrency test check that all jobs were processed --- jobqueue_test.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/jobqueue_test.go b/jobqueue_test.go index 5be98e2..33581b3 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -225,17 +225,31 @@ func TestJobConcurrency(t *testing.T) { }) // Queue a bunch of jobs, which should be processed concurrently - //ids := make([]uint64, 0) + ids := make([]uint64, 0) for i := 0; i < 20; i++ { j := testJob{Msg: fmt.Sprintf("hello %d", i)} - _, err := jq.Enqueue(j) + id, err := jq.Enqueue(j) assert.NoError(t, err) - //ids = append(ids, id) + ids = append(ids, id) } + // Give time for all jobs to be processed time.Sleep(time.Second) + + // Check that all jobs were processed + for i := 0; i < 10; i++ { + // Check that the job is removed from the in-memory index + _, ok := jq.isJobIDInQueue.Load(ids[i]) + assert.False(t, ok) + + // Check that the job is no longer in the badger DB + value, err := readJob(jq.db, ids[i]) + assert.Error(t, err, badger.ErrKeyNotFound) + assert.Nil(t, value) + } + } func readJob(db *badger.DB, id uint64) ([]byte, error) { From 61412bc871ca6b6bba0cbe51442fb5e9c43ee4bc Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Thu, 15 Aug 2024 12:13:34 -0400 Subject: [PATCH 04/18] Collecting and reporting job time stats --- go.mod | 1 + go.sum | 2 ++ jobqueue.go | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++--- timestat.go | 41 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 timestat.go diff --git a/go.mod b/go.mod index b0bf239..ad6e4e5 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/google/flatbuffers v1.12.1 // indirect github.com/klauspost/compress v1.12.3 // indirect github.com/kr/text v0.2.0 // indirect + github.com/loov/hrtime v1.0.3 github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index e09ff15..3132297 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,8 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/loov/hrtime v1.0.3 h1:LiWKU3B9skJwRPUf0Urs9+0+OE3TxdMuiRPOTwR0gcU= +github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/jobqueue.go b/jobqueue.go index 5865f40..0002a1d 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -10,6 +10,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/goccy/go-json" + "github.com/loov/hrtime" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -46,6 +47,21 @@ type JobQueue[T any] struct { // Options fetchInterval time.Duration + + // Stats + statsLock sync.Mutex // protects the stats below + + // job stats + jobRunTime TimeStat // stats on time that it takes to run a job (across all workers) + jobQueuedTime TimeStat // stats on how much time a job sits in the queue before being processed + + // queue stats + busyTime TimeStat // stats on time that the queue actively processing jobs + idleTime TimeStat // stats on how much time the queue is empty between jobs being processed + jobsProcessed int + jobsEnqueued int + jobsFailed int + jobsSucceeded int } // New creates a new JobQueue with the specified database, name, and number @@ -75,6 +91,16 @@ func New[T any]( jobs: make(chan *job[T], defaultJobBufferSize), fetchInterval: defaultFetchInterval, + + statsLock: sync.Mutex{}, + jobRunTime: TimeStat{}, + jobQueuedTime: TimeStat{}, + busyTime: TimeStat{}, + idleTime: TimeStat{}, + jobsProcessed: 0, + jobsEnqueued: 0, + jobsFailed: 0, + jobsSucceeded: 0, } for _, opt := range opts { opt(jq) @@ -131,6 +157,9 @@ func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error) { jq.logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Failed to enqueue job") return 0, err } + jq.statsLock.Lock() + jq.jobsEnqueued++ + jq.statsLock.Unlock() jq.logger.Info().Uint64("jobID", job.ID).Msg("job enqueued successfully") return job.ID, nil @@ -163,15 +192,26 @@ func (jq *JobQueue[T]) processJob(job *job[T], worker int) error { logger.Info().Int("worker", worker).Msg("Processing job") } - if err := job.Process(jq.handler); err != nil { + queuedTime := time.Since(job.CreatedAt) + startTime := hrtime.Now() + err := job.Process(jq.handler) + runTime := hrtime.Since(startTime) + jq.statsLock.Lock() + jq.jobsProcessed++ + jq.jobRunTime.RecordTime(runTime) + jq.jobQueuedTime.RecordTime(queuedTime) + if err != nil { + jq.jobsFailed++ + jq.statsLock.Unlock() return fmt.Errorf("failed to process job: %w", err) } - + jq.jobsSucceeded++ + jq.statsLock.Unlock() logger.Info().Msg("Job processed successfully") // Now that we've successfully processed the job, we can remove it from BadgerDB jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from BadgerDB") - err := jq.db.Update(func(txn *badger.Txn) error { + err = jq.db.Update(func(txn *badger.Txn) error { if err := txn.Delete(job.dbKey()); err != nil { return err } @@ -213,6 +253,17 @@ func (jq *JobQueue[T]) Stop() error { return err } + if jq.jobsEnqueued+jq.jobsProcessed > 0 { + jq.logger.Info(). + Int("jobsProcessed", jq.jobsProcessed). + Int("jobsEnqueued", jq.jobsEnqueued). + Int("jobsFailed", jq.jobsFailed). + Int("jobsSucceeded", jq.jobsSucceeded). + Str("jobRunTime", jq.jobRunTime.String()). + Str("jobQueuedTime", jq.jobQueuedTime.String()). + Msg("Job queue stats") + } + jq.logger.Info().Msg("Job queue stopped successfully") return nil diff --git a/timestat.go b/timestat.go new file mode 100644 index 0000000..386619d --- /dev/null +++ b/timestat.go @@ -0,0 +1,41 @@ +package jobqueue + +import ( + "time" +) + +type TimeStat struct { + TotalTime time.Duration + MinTime time.Duration + MaxTime time.Duration + Count int +} + +func (ts *TimeStat) AvgTime() time.Duration { + if ts.Count == 0 { + return 0 + } + return ts.TotalTime / time.Duration(ts.Count) +} + +func (ts *TimeStat) Reset() { + ts.TotalTime = 0 + ts.MinTime = 0 + ts.MaxTime = 0 + ts.Count = 0 +} + +func (ts *TimeStat) RecordTime(duration time.Duration) { + ts.TotalTime += duration + ts.Count++ + if ts.MinTime == 0 || duration < ts.MinTime { + ts.MinTime = duration + } + if duration > ts.MaxTime { + ts.MaxTime = duration + } +} + +func (ts *TimeStat) String() string { + return "tot " + ts.TotalTime.String() + " avg " + ts.AvgTime().String() + " min " + ts.MinTime.String() + " max " + ts.MaxTime.String() +} From ec2c727b664c663f2276da7e0d45265c41f22823 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Thu, 15 Aug 2024 16:52:29 -0400 Subject: [PATCH 05/18] Added busy and idle wall time stats for queue --- go.mod | 2 ++ go.sum | 2 ++ jobqueue.go | 38 ++++++++++++++++++++++++++++++++++++-- jobqueue_test.go | 37 ++++++++++++++++++++++++++++++++----- timestat.go | 5 +++-- 5 files changed, 75 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index ad6e4e5..5b876d2 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,8 @@ require ( github.com/stretchr/testify v1.9.0 ) +require go.uber.org/atomic v1.11.0 // indirect + require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 3132297..8daad32 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/jobqueue.go b/jobqueue.go index 0002a1d..977ba23 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -14,6 +14,8 @@ import ( "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "go.uber.org/atomic" ) type JobStatus string @@ -62,6 +64,10 @@ type JobQueue[T any] struct { jobsEnqueued int jobsFailed int jobsSucceeded int + + busyWorkerCount atomic.Int32 + busyStateChangeAt atomic.Time + queueIsIdle atomic.Bool } // New creates a new JobQueue with the specified database, name, and number @@ -95,13 +101,18 @@ func New[T any]( statsLock: sync.Mutex{}, jobRunTime: TimeStat{}, jobQueuedTime: TimeStat{}, - busyTime: TimeStat{}, - idleTime: TimeStat{}, + busyTime: TimeStat{}, // wall time, not CPU time + idleTime: TimeStat{}, // wall time, not CPU time jobsProcessed: 0, jobsEnqueued: 0, jobsFailed: 0, jobsSucceeded: 0, } + + jq.busyWorkerCount.Store(0) + jq.busyStateChangeAt.Store(time.Now()) + jq.queueIsIdle.Store(true) + for _, opt := range opts { opt(jq) } @@ -174,7 +185,28 @@ func (jq *JobQueue[T]) worker(id int) { // Worker stops running when the job channel is closed for job := range jq.jobs { + + wasIdle := jq.queueIsIdle.Swap(false) + jq.busyWorkerCount.Inc() + if wasIdle { + timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) + jq.busyStateChangeAt.Store(time.Now()) + jq.idleTime.RecordTime(timeSpentInState) + logger.Debug().Dur("timeIdle", timeSpentInState).Msg("*** Queue now busy *** ") + } + err := jq.processJob(job, id) + + if jq.busyWorkerCount.Dec() == 0 { + wasIdle := jq.queueIsIdle.Swap(true) + if !wasIdle { + timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) + jq.busyStateChangeAt.Store(time.Now()) + jq.busyTime.RecordTime(timeSpentInState) + logger.Debug().Dur("timeBusy", timeSpentInState).Msg("*** Queue now idle *** ") + } + } + if err != nil { logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Error processing job") } @@ -261,6 +293,8 @@ func (jq *JobQueue[T]) Stop() error { Int("jobsSucceeded", jq.jobsSucceeded). Str("jobRunTime", jq.jobRunTime.String()). Str("jobQueuedTime", jq.jobQueuedTime.String()). + Str("busyTime", jq.busyTime.String()). + Str("idleTime", jq.idleTime.String()). Msg("Job queue stats") } diff --git a/jobqueue_test.go b/jobqueue_test.go index 33581b3..fbec9ff 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -2,7 +2,9 @@ package jobqueue import ( "fmt" + "math/rand" "os" + "strconv" "testing" "time" @@ -27,12 +29,24 @@ type testJob struct { func testJobHandler() func(JobContext, testJob) error { return func(ctx JobContext, job testJob) error { - fmt.Println("Test job processed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + fmt.Println("Job Performed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing ctx.JobCreatedAt().Unix()) return nil } } +func complexJobHandler() func(JobContext, testJob) error { + return func(ctx JobContext, job testJob) error { + fmt.Println("Starting job...", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + ctx.JobCreatedAt().Unix()) + numMicroseconds := rand.Int63n(200) + 1 + time.Sleep(time.Duration(numMicroseconds) * time.Microsecond) + fmt.Println("Job completed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + ctx.JobCreatedAt().Unix(), "after "+strconv.FormatInt(numMicroseconds, 10)+"µs") + return nil + } +} + func TestNewJobQueue(t *testing.T) { t.Parallel() @@ -217,7 +231,7 @@ func TestJobConcurrency(t *testing.T) { cleanupBadgerDB(t) // create initial job queue - jq, err := New[testJob]("/tmp/badger", "test-job", 5, testJobHandler()) + jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler()) assert.NoError(t, err) t.Cleanup(func() { assert.NoError(t, jq.Stop()) @@ -238,14 +252,27 @@ func TestJobConcurrency(t *testing.T) { // Give time for all jobs to be processed time.Sleep(time.Second) + // add a few more new jobs + for i := 20; i < 22; i++ { + j := testJob{Msg: fmt.Sprintf("hello %d", i)} + + id, err := jq.Enqueue(j) + assert.NoError(t, err) + + ids = append(ids, id) + } + + // more time for new jobs to be processed + time.Sleep(time.Second) + // Check that all jobs were processed - for i := 0; i < 10; i++ { + for id := range ids { // Check that the job is removed from the in-memory index - _, ok := jq.isJobIDInQueue.Load(ids[i]) + _, ok := jq.isJobIDInQueue.Load(uint64(id)) assert.False(t, ok) // Check that the job is no longer in the badger DB - value, err := readJob(jq.db, ids[i]) + value, err := readJob(jq.db, uint64(id)) assert.Error(t, err, badger.ErrKeyNotFound) assert.Nil(t, value) } diff --git a/timestat.go b/timestat.go index 386619d..f156d5c 100644 --- a/timestat.go +++ b/timestat.go @@ -1,6 +1,7 @@ package jobqueue import ( + "strconv" "time" ) @@ -8,7 +9,7 @@ type TimeStat struct { TotalTime time.Duration MinTime time.Duration MaxTime time.Duration - Count int + Count int64 } func (ts *TimeStat) AvgTime() time.Duration { @@ -37,5 +38,5 @@ func (ts *TimeStat) RecordTime(duration time.Duration) { } func (ts *TimeStat) String() string { - return "tot " + ts.TotalTime.String() + " avg " + ts.AvgTime().String() + " min " + ts.MinTime.String() + " max " + ts.MaxTime.String() + return "tot " + ts.TotalTime.String() + " cnt " + strconv.FormatInt(ts.Count, 10) + " avg " + ts.AvgTime().String() + " min " + ts.MinTime.String() + " max " + ts.MaxTime.String() } From 881495af4e65ce06a78ea5b150557c70f9b76a2b Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Fri, 16 Aug 2024 10:31:50 -0400 Subject: [PATCH 06/18] fix capitalization on WithInMemDb option --- jobqueue_test.go | 10 +++++----- options.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/jobqueue_test.go b/jobqueue_test.go index fbec9ff..4c4f840 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -65,7 +65,7 @@ func TestNewJobQueue(t *testing.T) { dbPath: "/tmp/test_jobqueue_1", queueName: "test-queue-1", workers: 2, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: false, }, { @@ -73,7 +73,7 @@ func TestNewJobQueue(t *testing.T) { dbPath: "/tmp/test_jobqueue_2", queueName: "test-queue-2", workers: -1, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: true, }, { @@ -81,7 +81,7 @@ func TestNewJobQueue(t *testing.T) { dbPath: "/tmp/test_jobqueue_3", queueName: "test-queue-3", workers: 0, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: false, }, } @@ -118,7 +118,7 @@ func TestNewJobQueue(t *testing.T) { func TestJobQueue_Enqueue(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]()) + jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { @@ -150,7 +150,7 @@ func TestJobQueue_Enqueue(t *testing.T) { func TestJobQueue_ProcessJob(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]()) + jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { diff --git a/options.go b/options.go index 7139138..d58b0d4 100644 --- a/options.go +++ b/options.go @@ -25,7 +25,7 @@ func WithJobBufferSize[T any](size int) Option[T] { // WithInmemDB uses an in-memory BadgerDB instead of a persistent one. // Useful for testing, but provides no durability guarantees. -func WithInmemDB[T any]() Option[T] { +func WithInMemDB[T any]() Option[T] { return func(jq *JobQueue[T]) { jq.dbInMemory = true } From 51723421249827f7285b2880e84b1c5dbb7fe3cb Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Fri, 16 Aug 2024 10:32:21 -0400 Subject: [PATCH 07/18] move direct requires out of indirect section --- go.mod | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 5b876d2..e386370 100644 --- a/go.mod +++ b/go.mod @@ -6,13 +6,13 @@ require ( github.com/dgraph-io/badger/v4 v4.2.0 github.com/goccy/go-json v0.10.3 github.com/google/uuid v1.6.0 + github.com/loov/hrtime v1.0.3 github.com/puzpuzpuz/xsync/v3 v3.2.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.9.0 + go.uber.org/atomic v1.11.0 ) -require go.uber.org/atomic v1.11.0 // indirect - require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect @@ -26,7 +26,6 @@ require ( github.com/google/flatbuffers v1.12.1 // indirect github.com/klauspost/compress v1.12.3 // indirect github.com/kr/text v0.2.0 // indirect - github.com/loov/hrtime v1.0.3 github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/pkg/errors v0.9.1 // indirect From 4f8bac5532a1f693f04b5538aac64d66afd4d730 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Fri, 16 Aug 2024 10:33:03 -0400 Subject: [PATCH 08/18] add units to idle/busy debug time --- jobqueue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jobqueue.go b/jobqueue.go index 977ba23..44176cc 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -192,7 +192,7 @@ func (jq *JobQueue[T]) worker(id int) { timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) jq.busyStateChangeAt.Store(time.Now()) jq.idleTime.RecordTime(timeSpentInState) - logger.Debug().Dur("timeIdle", timeSpentInState).Msg("*** Queue now busy *** ") + logger.Debug().Dur("timeIdle(ms)", timeSpentInState).Msg("*** Queue now busy *** ") } err := jq.processJob(job, id) @@ -203,7 +203,7 @@ func (jq *JobQueue[T]) worker(id int) { timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) jq.busyStateChangeAt.Store(time.Now()) jq.busyTime.RecordTime(timeSpentInState) - logger.Debug().Dur("timeBusy", timeSpentInState).Msg("*** Queue now idle *** ") + logger.Debug().Dur("timeBusy(ms)", timeSpentInState).Msg("*** Queue now idle *** ") } } From 54568b06f6cdd9d36452c72f3142dc39d07942d7 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Fri, 16 Aug 2024 10:33:36 -0400 Subject: [PATCH 09/18] Add support for debugging in VS Code --- .vscode/launch.json | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..201ebc8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,20 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch", + "type": "go", + "request": "launch", + "mode": "auto", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}", + "env": {}, + "args": [], + "showLog": true + } + ] +} From 1ef651ac3faaaa523d27a86723153b681733458b Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Fri, 16 Aug 2024 14:14:52 -0400 Subject: [PATCH 10/18] Created JobQueueDB interface with BadgerDB implementation --- job.go | 9 --- jobqueue.go | 150 ++++++++++------------------------- jobqueue_db.go | 12 +++ jobqueue_db_badger.go | 180 ++++++++++++++++++++++++++++++++++++++++++ jobqueue_test.go | 39 ++------- 5 files changed, 240 insertions(+), 150 deletions(-) create mode 100644 jobqueue_db.go create mode 100644 jobqueue_db_badger.go diff --git a/job.go b/job.go index f15714b..ad63414 100644 --- a/job.go +++ b/job.go @@ -1,12 +1,9 @@ package jobqueue import ( - "fmt" "time" ) -const jobDBKeyPrefix = "job-" - // JobContext provides context for a job which is injected into the job Process method. type JobContext interface { JobID() uint64 @@ -52,9 +49,3 @@ func (j *job[T]) Process(handler func(JobContext, T) error) error { return nil } - -// dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID -// is strictly increasing to avoid queues being processed out of order. -func (j *job[T]) dbKey() []byte { - return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, j.ID)) -} diff --git a/jobqueue.go b/jobqueue.go index 44176cc..adcfb39 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -2,14 +2,11 @@ package jobqueue import ( "context" - "encoding/binary" "errors" "fmt" "sync" "time" - "github.com/dgraph-io/badger/v4" - "github.com/goccy/go-json" "github.com/loov/hrtime" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" @@ -31,10 +28,10 @@ var errJobChannelFull = errors.New("job channel is closed") const defaultFetchInterval = 100 * time.Millisecond const defaultJobBufferSize = 1000 -const defaultJobIDSequenceSize = 100 +const defaultJobsPerFetch = 10 type JobQueue[T any] struct { - db *badger.DB + db JobQueueDb[T] dbPath string dbInMemory bool @@ -43,12 +40,12 @@ type JobQueue[T any] struct { cancel context.CancelFunc handler func(JobContext, T) error - jobID *badger.Sequence isJobIDInQueue *xsync.MapOf[uint64, bool] jobs chan *job[T] // Options fetchInterval time.Duration + jobsPerFetch int // Stats statsLock sync.Mutex // protects the stats below @@ -92,11 +89,11 @@ func New[T any]( cancel: nil, handler: handler, - jobID: nil, isJobIDInQueue: xsync.NewMapOf[uint64, bool](), jobs: make(chan *job[T], defaultJobBufferSize), fetchInterval: defaultFetchInterval, + jobsPerFetch: defaultJobsPerFetch, statsLock: sync.Mutex{}, jobRunTime: TimeStat{}, @@ -117,7 +114,10 @@ func New[T any]( opt(jq) } - db, err := jq.openDB() + // TODO: figure out better way to do options for JobQueue DB + db := NewJobQueueDbBadger[T](jq.dbInMemory) // hardcoding BadgerDB for now. Add option for other DBs later + + err := db.Open(dbPath, name) if err != nil { return nil, err } @@ -128,12 +128,7 @@ func New[T any]( ctx, cancel := context.WithCancel(context.Background()) jq.cancel = cancel - jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize) - if err != nil { - return nil, fmt.Errorf("failed to start job id sequence: %w", err) - } - - // Load jobs from BadgerDB + // Load jobs from JobQueue DB go jq.pollJobs(ctx) // Start workers @@ -146,24 +141,15 @@ func New[T any]( } func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error) { - id, err := jq.jobID.Next() + // TODO: simplify by getting and setting the job ID when we add it, rather than on creation + id, err := jq.db.GetNextJobId() if err != nil { return 0, fmt.Errorf("failed to get next job id: %w", err) } - // Create a new job and store it in BadgerDB + // Create a new job and store it in queue's database job := newJob(id, payload) - jobBytes, err := json.Marshal(job) - if err != nil { - return 0, fmt.Errorf("failed to marshal job: %w", err) - } - - err = jq.db.Update(func(txn *badger.Txn) error { - if err := txn.Set(job.dbKey(), jobBytes); err != nil { - return fmt.Errorf("failed to store job: %w", err) - } - return nil - }) + _, err = jq.db.AddJob(job) if err != nil { jq.logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Failed to enqueue job") return 0, err @@ -241,14 +227,9 @@ func (jq *JobQueue[T]) processJob(job *job[T], worker int) error { jq.statsLock.Unlock() logger.Info().Msg("Job processed successfully") - // Now that we've successfully processed the job, we can remove it from BadgerDB - jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from BadgerDB") - err = jq.db.Update(func(txn *badger.Txn) error { - if err := txn.Delete(job.dbKey()); err != nil { - return err - } - return nil - }) + // Now that we've successfully processed the job, we can remove it from JobQueue DB + jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from JobQueue DB") + err = jq.db.DeleteJob(job.ID) if err != nil { logger.Error().Err(err).Msg("Failed to remove completed job from db") return err @@ -264,8 +245,8 @@ func (jq *JobQueue[T]) processJob(job *job[T], worker int) error { func (jq *JobQueue[T]) Stop() error { jq.logger.Info().Msg("Stopping job queue") - // Stop jobs fetch from BadgerDB - jq.logger.Debug().Msg("Stopping jobs fetch from BadgerDB") + // Stop jobs fetch from JobQueue DB + jq.logger.Debug().Msg("Stopping jobs fetch from JobQueue DB") jq.cancel() // Close the channel to signal the workers to stop @@ -275,13 +256,10 @@ func (jq *JobQueue[T]) Stop() error { jq.logger.Debug().Msg("Waiting for workers to finish") jq.wg.Wait() - // Close Badger DB connection - jq.logger.Debug().Msg("Closing Badger DB connection") - if err := jq.jobID.Release(); err != nil { - jq.logger.Error().Err(err).Msg("Failed to release next job id sequence") - } + // Close JobQueue DB connection + jq.logger.Debug().Msg("Closing JobQueue DB connection") if err := jq.db.Close(); err != nil { - jq.logger.Error().Err(err).Msg("Failed to close Badger DB connection") + jq.logger.Error().Err(err).Msg("Failed to close JobQueue DB connection") return err } @@ -303,7 +281,7 @@ func (jq *JobQueue[T]) Stop() error { return nil } -// pollJobs is a long-running goroutine that fetches jobs from BadgerDB and sends them to the worker channels. +// pollJobs is a long-running goroutine that fetches jobs from the JobQueue DB and sends them to the worker channels. func (jq *JobQueue[T]) pollJobs(ctx context.Context) { ticker := time.NewTicker(jq.fetchInterval) @@ -322,73 +300,31 @@ func (jq *JobQueue[T]) pollJobs(ctx context.Context) { } func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit - err := jq.db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 10 - it := txn.NewIterator(opts) - defer it.Close() - - for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() { - item := it.Item() - err := item.Value(func(v []byte) error { - var job job[T] - if err := json.Unmarshal(v, &job); err != nil { - jq.logger.Error().Err(err).Uint64("jobID", - binary.BigEndian.Uint64(item.Key())).Msg("Failed to unmarshal job") - return err - } - - if job.Status == JobStatusPending { - // If the job is already fetched, skip it - _, ok := jq.isJobIDInQueue.Load(job.ID) - if ok { - return nil - } - - select { - case <-ctx.Done(): - jq.logger.Debug().Msg("Context cancelled, stopping iteration") - break - - case jq.jobs <- &job: - jq.isJobIDInQueue.Store(job.ID, true) - jq.logger.Debug().Uint64("jobID", job.ID).Msg("New pending job found and sent to worker") - - default: - jq.logger.Warn().Uint64("JobID", - job.ID).Msg("Found pending jobs, but job channel is full") - return errJobChannelFull - } - } - - return nil - }) - if err != nil { - return err - } - } - return nil - }) + jobs, err := jq.db.FetchJobs(jq.jobsPerFetch) if err != nil { return fmt.Errorf("failed to fetch jobs: %w", err) } + for _, job := range jobs { + if job.Status == JobStatusPending { + // If the job is already fetched, skip it + _, ok := jq.isJobIDInQueue.Load(job.ID) + if ok { + continue + } + } + select { + case <-ctx.Done(): + jq.logger.Debug().Msg("Context cancelled, stopping iteration") + return nil // stop the fetch loop, but don't return an error - return nil -} - -func (jq *JobQueue[T]) openDB() (*badger.DB, error) { - var opts badger.Options - if jq.dbInMemory { - opts = badger.DefaultOptions("").WithInMemory(true) - } else { - opts = badger.DefaultOptions(jq.dbPath) - } - opts.Logger = nil + case jq.jobs <- job: + jq.isJobIDInQueue.Store(job.ID, true) + jq.logger.Debug().Uint64("jobID", job.ID).Msg("New job found and sent to worker") - db, err := badger.Open(opts) - if err != nil { - return nil, fmt.Errorf("failed to open BadgerDB: %w", err) + default: + jq.logger.Warn().Uint64("JobID", job.ID).Msg("Found jobs, but job channel is full") + return errJobChannelFull + } } - - return db, nil + return nil } diff --git a/jobqueue_db.go b/jobqueue_db.go new file mode 100644 index 0000000..fbc1440 --- /dev/null +++ b/jobqueue_db.go @@ -0,0 +1,12 @@ +package jobqueue + +type JobQueueDb[T any] interface { + Open(path string, queueName string) error + Close() error + GetNextJobId() (uint64, error) + FetchJobs(count int) ([]*job[T], error) + ReadJob(jobID uint64) (*job[T], error) + UpdateJob(job *job[T]) error + AddJob(job *job[T]) (uint64, error) // returns the job ID + DeleteJob(jobID uint64) error +} diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go new file mode 100644 index 0000000..58a7128 --- /dev/null +++ b/jobqueue_db_badger.go @@ -0,0 +1,180 @@ +package jobqueue + +import ( + "encoding/binary" + "fmt" + + "github.com/dgraph-io/badger/v4" + "github.com/goccy/go-json" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const defaultJobIDSequenceSize = 100 +const jobDBKeyPrefix = "job-" +const jobPrefetchSize = 10 + +type JobQueueDbBadger[T any] struct { + db *badger.DB + dbPath string + dbInMemory bool + jobID *badger.Sequence + logger zerolog.Logger +} + +func NewJobQueueDbBadger[T any](inMemory bool) JobQueueDb[T] { + return &JobQueueDbBadger[T]{ + db: nil, + dbPath: "", + dbInMemory: inMemory, + jobID: nil, + logger: log.With().Str("module", "JobQueue").Str("dbType", "Badger").Logger(), + } +} + +func (jqdb *JobQueueDbBadger[T]) Open(path string, queueName string) error { + jqdb.dbPath = path + + var opts badger.Options + if jqdb.dbInMemory { + opts = badger.DefaultOptions("").WithInMemory(true) + } else { + opts = badger.DefaultOptions(jqdb.dbPath) + } + opts.Logger = nil + + // open the BadgerDB + db, err := badger.Open(opts) + if err != nil { + return fmt.Errorf("failed to open BadgerDB: %w", err) + } + jqdb.db = db + + // setup the job ID sequence + jobID, err := jqdb.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize) + if err != nil { + return fmt.Errorf("failed to get next job ID sequence: %w", err) + } else { + jqdb.jobID = jobID + } + return err +} + +func (jqdb *JobQueueDbBadger[T]) Close() error { + //jqdb.logger.Debug().Msg("Closing Badger DB connection") + if err := jqdb.jobID.Release(); err != nil { + //jqdb.logger.Error().Err(err).Msg("Failed to release next job id sequence") + } + if err := jqdb.db.Close(); err != nil { + //jqdb.logger.Error().Err(err).Msg("Failed to close Badger DB connection") + return err + } + return nil +} + +func (jqdb *JobQueueDbBadger[T]) GetNextJobId() (uint64, error) { + id, err := jqdb.jobID.Next() + return id, err +} + +func (jqdb *JobQueueDbBadger[T]) FetchJobs(count int) ([]*job[T], error) { + // create a new array of jobs + jobs := make([]*job[T], 0, count) + + err := jqdb.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = jobPrefetchSize + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() { + item := it.Item() + err := item.Value(func(v []byte) error { + var job job[T] + if err := json.Unmarshal(v, &job); err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(item.Key())).Msg("Failed to unmarshal job") + return err + } + jobs = append(jobs, &job) + return nil + }) + if err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(item.Key())).Msg("Failed fetch job") + return err + } + } + return nil + }) + if err != nil && len(jobs) == 0 { + // only return an error if we didn't fetch any jobs at all. If we fetched some jobs, we can still process them. + return nil, fmt.Errorf("failed to fetch any jobs: %w", err) + } + + return jobs, nil +} + +func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { + var val []byte + var theItem *badger.Item + err := jqdb.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(dbKeyForJob(jobID))) + if err != nil { + return err + } + theItem = item + val, err = item.ValueCopy(nil) + return err + }) + if err != nil { + return nil, fmt.Errorf("failed to read job: %w", err) + } + var theJob job[T] + if err := json.Unmarshal(val, &theJob); err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(theItem.Key())).Msg("Failed to unmarshal job") + return nil, err + } + if err != nil { + return nil, err + } + + return &theJob, nil +} + +func (jqdb *JobQueueDbBadger[T]) UpdateJob(job *job[T]) error { + return nil +} + +func (jqdb *JobQueueDbBadger[T]) AddJob(job *job[T]) (uint64, error) { + jobBytes, err := json.Marshal(job) + if err != nil { + return 0, fmt.Errorf("failed to marshal job: %w", err) + } + + err = jqdb.db.Update(func(txn *badger.Txn) error { + if err := txn.Set(dbKeyForJob(job.ID), jobBytes); err != nil { + return fmt.Errorf("failed to store job: %w", err) + } + return nil + }) + return job.ID, err +} + +func (jqdb *JobQueueDbBadger[T]) DeleteJob(jobID uint64) error { + err := jqdb.db.Update(func(txn *badger.Txn) error { + if err := txn.Delete(dbKeyForJob(jobID)); err != nil { + return err + } + return nil + }) + return err +} + +// dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID +// is strictly increasing to avoid queues being processed out of order. +// FIXME: not sure this is relevant anymore, since we can execute jobs in parallel +func dbKeyForJob(jobId uint64) []byte { + return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, jobId)) +} diff --git a/jobqueue_test.go b/jobqueue_test.go index 4c4f840..8536ed0 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/dgraph-io/badger/v4" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" @@ -103,7 +102,6 @@ func TestNewJobQueue(t *testing.T) { require.NotNil(t, jq) assert.NotNil(t, jq.db) - assert.NotNil(t, jq.jobID) assert.NotNil(t, jq.isJobIDInQueue) assert.NotNil(t, jq.jobs) @@ -132,11 +130,7 @@ func TestJobQueue_Enqueue(t *testing.T) { assert.NoError(t, err) // Verify that the job was stored in badger DB - value, err := readJob(jq.db, id) - assert.NoError(t, err) - - var dbJob job[testJob] - err = json.Unmarshal(value, &dbJob) + dbJob, err := jq.db.ReadJob(id) assert.NoError(t, err) // Verify that the job is what we're expecting @@ -186,9 +180,9 @@ func TestJobQueue_ProcessJob(t *testing.T) { assert.False(t, ok) // Check that the job is no longer in the badger DB - value, err := readJob(jq.db, ids[i]) + dbJob, err := jq.db.ReadJob(ids[i]) assert.Error(t, err, badger.ErrKeyNotFound) - assert.Nil(t, value) + assert.Nil(t, dbJob) } } @@ -272,34 +266,11 @@ func TestJobConcurrency(t *testing.T) { assert.False(t, ok) // Check that the job is no longer in the badger DB - value, err := readJob(jq.db, uint64(id)) + job, err := jq.db.ReadJob(uint64(id)) assert.Error(t, err, badger.ErrKeyNotFound) - assert.Nil(t, value) - } - -} - -func readJob(db *badger.DB, id uint64) ([]byte, error) { - return readKey(db, fmt.Sprintf("%s%d", jobDBKeyPrefix, id)) -} - -func readKey(db *badger.DB, key string) ([]byte, error) { - var valCopy []byte - err := db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte(key)) - if err != nil { - return err - } - - valCopy, err = item.ValueCopy(nil) - return err - }) - - if err != nil { - return nil, err + assert.Nil(t, job) } - return valCopy, nil } func cleanupBadgerDB(t *testing.T) { From 588ff28e9a7869b939a7bf26625fd4498cefec0d Mon Sep 17 00:00:00 2001 From: edmund-zavada <154269028+edmund-zavada@users.noreply.github.com> Date: Fri, 16 Aug 2024 16:55:03 -0400 Subject: [PATCH 11/18] minor cleanup --- jobqueue_db_badger.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go index 58a7128..a0087f8 100644 --- a/jobqueue_db_badger.go +++ b/jobqueue_db_badger.go @@ -63,10 +63,10 @@ func (jqdb *JobQueueDbBadger[T]) Open(path string, queueName string) error { func (jqdb *JobQueueDbBadger[T]) Close() error { //jqdb.logger.Debug().Msg("Closing Badger DB connection") if err := jqdb.jobID.Release(); err != nil { - //jqdb.logger.Error().Err(err).Msg("Failed to release next job id sequence") + jqdb.logger.Error().Err(err).Msg("Failed to release next job id sequence") } if err := jqdb.db.Close(); err != nil { - //jqdb.logger.Error().Err(err).Msg("Failed to close Badger DB connection") + jqdb.logger.Error().Err(err).Msg("Failed to close Badger DB connection") return err } return nil @@ -136,9 +136,6 @@ func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { binary.BigEndian.Uint64(theItem.Key())).Msg("Failed to unmarshal job") return nil, err } - if err != nil { - return nil, err - } return &theJob, nil } From bf73f65dcbf8cc421e99f7eb637fc8fc8e8e2b9f Mon Sep 17 00:00:00 2001 From: edmund-zavada <154269028+edmund-zavada@users.noreply.github.com> Date: Fri, 16 Aug 2024 18:19:42 -0400 Subject: [PATCH 12/18] Incomplete work toward adding mongo db option - added new options to the JobQueue for UseMongoDB and setting jobs per fetch - partial implementation of jobqueue_db_mongo to provide a MongoDB implementation of the JobQueueDb interface - commented out section that will test the mongo version (tests fail when uncommented) --- go.mod | 15 ++++++-- go.sum | 39 +++++++++++++++++++++ jobqueue.go | 13 +++++-- jobqueue_db_mongo.go | 83 ++++++++++++++++++++++++++++++++++++++++++++ jobqueue_test.go | 7 ++++ options.go | 31 ++++++++++++++++- 6 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 jobqueue_db_mongo.go diff --git a/go.mod b/go.mod index e386370..fc3a0a6 100644 --- a/go.mod +++ b/go.mod @@ -22,17 +22,26 @@ require ( github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v1.12.1 // indirect - github.com/klauspost/compress v1.12.3 // indirect + github.com/klauspost/compress v1.13.6 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go.mongodb.org/mongo-driver v1.16.1 // indirect go.opencensus.io v0.22.5 // indirect - golang.org/x/net v0.7.0 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8daad32..6ca1766 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -46,6 +48,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -58,6 +62,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -71,8 +77,19 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8= +go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -80,12 +97,16 @@ go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -94,8 +115,12 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -103,19 +128,32 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -123,6 +161,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/jobqueue.go b/jobqueue.go index adcfb39..f3e78a5 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -34,6 +34,7 @@ type JobQueue[T any] struct { db JobQueueDb[T] dbPath string dbInMemory bool + dbUseMongo bool wg sync.WaitGroup logger zerolog.Logger @@ -83,6 +84,7 @@ func New[T any]( db: nil, dbPath: dbPath, dbInMemory: false, + dbUseMongo: false, wg: sync.WaitGroup{}, logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(), @@ -114,9 +116,14 @@ func New[T any]( opt(jq) } - // TODO: figure out better way to do options for JobQueue DB - db := NewJobQueueDbBadger[T](jq.dbInMemory) // hardcoding BadgerDB for now. Add option for other DBs later - + // Open JobQueue DB + var db JobQueueDb[T] + if jq.dbUseMongo { + dbPath = jq.dbPath // this will have been set by the options + db = NewJobQueueDbMongo[T]() + } else { + db = NewJobQueueDbBadger[T](jq.dbInMemory) + } err := db.Open(dbPath, name) if err != nil { return nil, err diff --git a/jobqueue_db_mongo.go b/jobqueue_db_mongo.go new file mode 100644 index 0000000..aa01e93 --- /dev/null +++ b/jobqueue_db_mongo.go @@ -0,0 +1,83 @@ +package jobqueue + +import ( + "context" + "fmt" + + // "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// JobQueueDbMongo is the MongoDB implementation of the JobQueueDb interface +type JobQueueDbMongo[T any] struct { + client *mongo.Client + ctx context.Context + db *mongo.Database + coll *mongo.Collection +} + +// NewJobQueueDbMongo creates a new JobQueueDbMongo instance +func NewJobQueueDbMongo[T any]() JobQueueDb[T] { + return &JobQueueDbMongo[T]{ + client: nil, + ctx: context.TODO(), + db: nil, + coll: nil, + } +} + +// Open the MongoDB database +func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error { + client, err := mongo.Connect(jqdb.ctx, options.Client().ApplyURI(path)) + if err != nil { + return fmt.Errorf("failed to connect to MongoDB at %s: %w", path, err) + } + jqdb.client = client + // TODO: handle mongo db options + jqdb.db = client.Database("job_queues") + if jqdb.db == nil { + return fmt.Errorf("failed to open mongo database job_queues") + } + jqdb.coll = jqdb.db.Collection(queueName + "_jobs") + if jqdb.coll == nil { + return fmt.Errorf("failed to open collection job_queues.%s_jobs", queueName) + } + return nil +} + +// Close the MongoDB database +func (jqdb *JobQueueDbMongo[T]) Close() error { + err := jqdb.client.Disconnect(jqdb.ctx) + return err +} + +// GetNextJobId() (uint64, error) +func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error) { + return 0, nil +} + +// FetchJobs(count int) ([]*job[T], error) +func (jqdb *JobQueueDbMongo[T]) FetchJobs(count int) ([]*job[T], error) { + return nil, nil +} + +// ReadJob(jobID uint64) (*job[T], error) +func (jqdb *JobQueueDbMongo[T]) ReadJob(jobID uint64) (*job[T], error) { + return nil, nil +} + +// UpdateJob(job *job[T]) error +func (jqdb *JobQueueDbMongo[T]) UpdateJob(job *job[T]) error { + return nil +} + +// AddJob(job *job[T]) (uint64, error) // returns the job ID +func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { + return 0, nil +} + +// DeleteJob(jobID uint64) error +func (jqdb *JobQueueDbMongo[T]) DeleteJob(jobID uint64) error { + return nil +} diff --git a/jobqueue_test.go b/jobqueue_test.go index 8536ed0..62738d7 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -225,7 +225,14 @@ func TestJobConcurrency(t *testing.T) { cleanupBadgerDB(t) // create initial job queue + + // mongo version + //jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(), + // UseMongoDB[testJob]("mongodb://localhost:27017")) + + // badger version jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler()) + assert.NoError(t, err) t.Cleanup(func() { assert.NoError(t, jq.Stop()) diff --git a/options.go b/options.go index d58b0d4..5a7a115 100644 --- a/options.go +++ b/options.go @@ -25,8 +25,37 @@ func WithJobBufferSize[T any](size int) Option[T] { // WithInmemDB uses an in-memory BadgerDB instead of a persistent one. // Useful for testing, but provides no durability guarantees. +// if we previously called UseMongoDB, we will warn and ignore this option. func WithInMemDB[T any]() Option[T] { return func(jq *JobQueue[T]) { - jq.dbInMemory = true + if jq.dbUseMongo { + jq.logger.Warn().Msg("Ignoring WithInMemDB option, not compatible with UseMongoDB option") + } else { + jq.logger.Debug().Msg("Using Badger In-Memory DB for Job Queue DB") + jq.dbInMemory = true + } + } +} + +// how many jobs at once are retrieved from the DB in a single fetch operation +func WithJobsPerFetch[T any](count int) Option[T] { + return func(jq *JobQueue[T]) { + jq.logger.Debug().Msg(fmt.Sprintf("Jobs per fetch set to %d", count)) + jq.jobsPerFetch = count + } +} + +// UseMongoDB sets the JobQueue to use MongoDB instead of BadgerDB. +// if WithInMemDB was previously called, we will warn and ignore this option. +func UseMongoDB[T any](uri string) Option[T] { + return func(jq *JobQueue[T]) { + if jq.dbInMemory { + jq.logger.Warn().Msg("Ignoring UseMongoDB option, not compatible with WithInMemDB option") + } else { + jq.logger.Debug().Msg(fmt.Sprintf("Using Mongo DB at %s for Job Queue DB", uri)) + jq.dbInMemory = false + jq.dbPath = uri + jq.dbUseMongo = true + } } } From 6c2d5df6de827704ba612b503481d6f9fb1d0599 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Sat, 17 Aug 2024 17:15:34 -0400 Subject: [PATCH 13/18] GetNextJobId() and AddJob() are working for Mongo --- jobqueue.go | 8 +++--- jobqueue_db_mongo.go | 66 ++++++++++++++++++++++++++++++++++---------- jobqueue_test.go | 6 ++-- 3 files changed, 59 insertions(+), 21 deletions(-) diff --git a/jobqueue.go b/jobqueue.go index f3e78a5..a59b45e 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -116,11 +116,14 @@ func New[T any]( opt(jq) } + ctx, cancel := context.WithCancel(context.Background()) + jq.cancel = cancel + // Open JobQueue DB var db JobQueueDb[T] if jq.dbUseMongo { dbPath = jq.dbPath // this will have been set by the options - db = NewJobQueueDbMongo[T]() + db = NewJobQueueDbMongo[T](ctx) } else { db = NewJobQueueDbBadger[T](jq.dbInMemory) } @@ -132,9 +135,6 @@ func New[T any]( jq.logger.Info().Msg("Starting job queue") - ctx, cancel := context.WithCancel(context.Background()) - jq.cancel = cancel - // Load jobs from JobQueue DB go jq.pollJobs(ctx) diff --git a/jobqueue_db_mongo.go b/jobqueue_db_mongo.go index aa01e93..5709cad 100644 --- a/jobqueue_db_mongo.go +++ b/jobqueue_db_mongo.go @@ -2,28 +2,32 @@ package jobqueue import ( "context" + "errors" "fmt" - // "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) // JobQueueDbMongo is the MongoDB implementation of the JobQueueDb interface type JobQueueDbMongo[T any] struct { - client *mongo.Client - ctx context.Context - db *mongo.Database - coll *mongo.Collection + client *mongo.Client + ctx context.Context + db *mongo.Database + coll *mongo.Collection + idColl *mongo.Collection + jobQueueName string } // NewJobQueueDbMongo creates a new JobQueueDbMongo instance -func NewJobQueueDbMongo[T any]() JobQueueDb[T] { +func NewJobQueueDbMongo[T any](ctx context.Context) JobQueueDb[T] { return &JobQueueDbMongo[T]{ - client: nil, - ctx: context.TODO(), - db: nil, - coll: nil, + client: nil, + ctx: ctx, + db: nil, + coll: nil, + jobQueueName: "", } } @@ -39,9 +43,16 @@ func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error { if jqdb.db == nil { return fmt.Errorf("failed to open mongo database job_queues") } - jqdb.coll = jqdb.db.Collection(queueName + "_jobs") + // holds the jobs for the queue + jqdb.jobQueueName = queueName + "_jobs" + jqdb.coll = jqdb.db.Collection(jqdb.jobQueueName) if jqdb.coll == nil { - return fmt.Errorf("failed to open collection job_queues.%s_jobs", queueName) + return fmt.Errorf("failed to open collection job_queues.%s", jqdb.jobQueueName) + } + // holds the job IDs for all queues + jqdb.idColl = jqdb.db.Collection("job_ids") + if jqdb.idColl == nil { + return fmt.Errorf("failed to open collection job_queues.job_ids") } return nil } @@ -54,7 +65,30 @@ func (jqdb *JobQueueDbMongo[T]) Close() error { // GetNextJobId() (uint64, error) func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error) { - return 0, nil + + var nextJobId uint64 + result := jqdb.idColl.FindOneAndUpdate(jqdb.ctx, + bson.D{{Key: "queue", Value: jqdb.jobQueueName}}, // selector + bson.D{{Key: "$inc", Value: bson.D{{Key: "next_job_id", Value: 1}}}}) // update, increment next_job_id by 1, return old record + if result.Err() != nil { + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + // insert the queue if it doesn't exist. We start with ID 2 because we are returning 1 + _, err := jqdb.idColl.InsertOne(jqdb.ctx, bson.D{{Key: "queue", Value: jqdb.jobQueueName}, {Key: "next_job_id", Value: 2}}) + if err != nil { + return 0, fmt.Errorf("failed to create initial mongo record for next job id: %w", err) + } + nextJobId = 1 + } else { + return 0, fmt.Errorf("failed to get next job id: %w", result.Err()) + } + } + raw, err := result.Raw() + if err != nil { + return 0, fmt.Errorf("failed to get raw result from mongo: %w", err) + } + val := raw.Lookup("next_job_id") + nextJobId = uint64(val.AsInt64()) + return nextJobId, nil } // FetchJobs(count int) ([]*job[T], error) @@ -74,7 +108,11 @@ func (jqdb *JobQueueDbMongo[T]) UpdateJob(job *job[T]) error { // AddJob(job *job[T]) (uint64, error) // returns the job ID func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { - return 0, nil + _, err := jqdb.coll.InsertOne(jqdb.ctx, job) + if err != nil { + return 0, fmt.Errorf("failed to insert job into mongo collection: %w", err) + } + return job.ID, nil } // DeleteJob(jobID uint64) error diff --git a/jobqueue_test.go b/jobqueue_test.go index 62738d7..711a8c8 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -227,11 +227,11 @@ func TestJobConcurrency(t *testing.T) { // create initial job queue // mongo version - //jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(), - // UseMongoDB[testJob]("mongodb://localhost:27017")) + jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(), + UseMongoDB[testJob]("mongodb://localhost:27017")) // badger version - jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler()) + //jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler()) assert.NoError(t, err) t.Cleanup(func() { From d5c7f6af1f447831a4841b43ae671072f1408742 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Sun, 18 Aug 2024 08:56:48 -0400 Subject: [PATCH 14/18] Cleanup options, More Mongo implementation - Removed path from JobQueue creation - Added WithBadgerDB(path) option - Simplified JobQueue.Enqueue() to get id from JobQueueDB.AddJob() - Refactored TestConcurrency to be called from TestBadgerJobConcurrency and from TestMongoJobConcurrency - Removed unused JobQueueDb.UpdateJob() - Implemented AddJob() and DeleteJob() in Mongo JobQueueDb --- job.go | 4 +-- jobqueue.go | 44 +++++++++++++++--------------- jobqueue_db.go | 8 +++++- jobqueue_db_badger.go | 12 ++++++--- jobqueue_db_mongo.go | 37 ++++++++++++++++++++------ jobqueue_test.go | 62 ++++++++++++++++++++++++++----------------- options.go | 43 +++++++++++++++++++++--------- 7 files changed, 135 insertions(+), 75 deletions(-) diff --git a/job.go b/job.go index ad63414..77e9484 100644 --- a/job.go +++ b/job.go @@ -21,9 +21,9 @@ type job[T any] struct { CreatedAt time.Time `json:"created_at"` } -func newJob[T any](id uint64, payload T) *job[T] { +func newJob[T any](payload T) *job[T] { return &job[T]{ - ID: id, + ID: 0, // ID is set when the job is added to the queue Payload: payload, Status: JobStatusPending, CreatedAt: time.Now(), diff --git a/jobqueue.go b/jobqueue.go index a59b45e..c98c2f0 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -31,10 +31,11 @@ const defaultJobBufferSize = 1000 const defaultJobsPerFetch = 10 type JobQueue[T any] struct { - db JobQueueDb[T] - dbPath string - dbInMemory bool - dbUseMongo bool + db JobQueueDb[T] + dbPath string + dbInMemory bool + dbUseMongo bool + dbUseBadger bool wg sync.WaitGroup logger zerolog.Logger @@ -71,8 +72,7 @@ type JobQueue[T any] struct { // New creates a new JobQueue with the specified database, name, and number // of worker goroutines. It initializes the job queue, starts the worker goroutines, // and returns the JobQueue instance and an error, if any. -func New[T any]( - dbPath string, name string, workers int, handler func(JobContext, T) error, opts ...Option[T], +func New[T any](name string, workers int, handler func(JobContext, T) error, opts ...Option[T], ) (*JobQueue[T], error) { if workers < 0 { return nil, errors.New("invalid number of workers") @@ -81,10 +81,11 @@ func New[T any]( } jq := &JobQueue[T]{ - db: nil, - dbPath: dbPath, - dbInMemory: false, - dbUseMongo: false, + db: nil, + dbPath: "", + dbInMemory: false, + dbUseMongo: false, + dbUseBadger: false, wg: sync.WaitGroup{}, logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(), @@ -115,6 +116,10 @@ func New[T any]( for _, opt := range opts { opt(jq) } + // make sure we have a valid db option, default to in memory if none provided + if !jq.dbUseBadger && !jq.dbUseMongo && !jq.dbInMemory { + jq.dbInMemory = true + } ctx, cancel := context.WithCancel(context.Background()) jq.cancel = cancel @@ -122,12 +127,11 @@ func New[T any]( // Open JobQueue DB var db JobQueueDb[T] if jq.dbUseMongo { - dbPath = jq.dbPath // this will have been set by the options db = NewJobQueueDbMongo[T](ctx) } else { db = NewJobQueueDbBadger[T](jq.dbInMemory) } - err := db.Open(dbPath, name) + err := db.Open(jq.dbPath, name) if err != nil { return nil, err } @@ -148,25 +152,19 @@ func New[T any]( } func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error) { - // TODO: simplify by getting and setting the job ID when we add it, rather than on creation - id, err := jq.db.GetNextJobId() - if err != nil { - return 0, fmt.Errorf("failed to get next job id: %w", err) - } - // Create a new job and store it in queue's database - job := newJob(id, payload) - _, err = jq.db.AddJob(job) + job := newJob(payload) + id, err := jq.db.AddJob(job) if err != nil { - jq.logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Failed to enqueue job") + jq.logger.Error().Err(err).Uint64("jobID", id).Msg("Failed to enqueue job") return 0, err } jq.statsLock.Lock() jq.jobsEnqueued++ jq.statsLock.Unlock() - jq.logger.Info().Uint64("jobID", job.ID).Msg("job enqueued successfully") - return job.ID, nil + jq.logger.Info().Uint64("jobID", id).Msg("job enqueued successfully") + return id, nil } // worker processes jobs received from the job queue and logs any errors encountered. diff --git a/jobqueue_db.go b/jobqueue_db.go index fbc1440..87733e5 100644 --- a/jobqueue_db.go +++ b/jobqueue_db.go @@ -1,12 +1,18 @@ package jobqueue +import ( + "errors" +) + type JobQueueDb[T any] interface { Open(path string, queueName string) error Close() error GetNextJobId() (uint64, error) FetchJobs(count int) ([]*job[T], error) ReadJob(jobID uint64) (*job[T], error) - UpdateJob(job *job[T]) error AddJob(job *job[T]) (uint64, error) // returns the job ID DeleteJob(jobID uint64) error } + +// returned by ReadJob or UpdateJob if the job is not found +var ErrJobNotFound = errors.New("job not found") diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go index a0087f8..902fefd 100644 --- a/jobqueue_db_badger.go +++ b/jobqueue_db_badger.go @@ -128,6 +128,9 @@ func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { return err }) if err != nil { + if err == badger.ErrKeyNotFound { + return nil, ErrJobNotFound + } return nil, fmt.Errorf("failed to read job: %w", err) } var theJob job[T] @@ -140,11 +143,12 @@ func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { return &theJob, nil } -func (jqdb *JobQueueDbBadger[T]) UpdateJob(job *job[T]) error { - return nil -} - func (jqdb *JobQueueDbBadger[T]) AddJob(job *job[T]) (uint64, error) { + id, err := jqdb.GetNextJobId() + if err != nil { + return 0, fmt.Errorf("failed to get next job id: %w", err) + } + job.ID = id jobBytes, err := json.Marshal(job) if err != nil { return 0, fmt.Errorf("failed to marshal job: %w", err) diff --git a/jobqueue_db_mongo.go b/jobqueue_db_mongo.go index 5709cad..d8f0ecf 100644 --- a/jobqueue_db_mongo.go +++ b/jobqueue_db_mongo.go @@ -44,7 +44,7 @@ func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error { return fmt.Errorf("failed to open mongo database job_queues") } // holds the jobs for the queue - jqdb.jobQueueName = queueName + "_jobs" + jqdb.jobQueueName = dbCollectionNameForQueue(queueName) jqdb.coll = jqdb.db.Collection(jqdb.jobQueueName) if jqdb.coll == nil { return fmt.Errorf("failed to open collection job_queues.%s", jqdb.jobQueueName) @@ -98,17 +98,29 @@ func (jqdb *JobQueueDbMongo[T]) FetchJobs(count int) ([]*job[T], error) { // ReadJob(jobID uint64) (*job[T], error) func (jqdb *JobQueueDbMongo[T]) ReadJob(jobID uint64) (*job[T], error) { - return nil, nil -} - -// UpdateJob(job *job[T]) error -func (jqdb *JobQueueDbMongo[T]) UpdateJob(job *job[T]) error { - return nil + result := jqdb.coll.FindOne(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) + if result.Err() != nil { + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + return nil, ErrJobNotFound + } + return nil, fmt.Errorf("failed to read job from mongo collection: %w", result.Err()) + } + var j job[T] + err := result.Decode(&j) + if err != nil { + return nil, fmt.Errorf("failed to decode job from mongo collection: %w", err) + } + return &j, nil } // AddJob(job *job[T]) (uint64, error) // returns the job ID func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { - _, err := jqdb.coll.InsertOne(jqdb.ctx, job) + id, err := jqdb.GetNextJobId() + if err != nil { + return 0, fmt.Errorf("failed to get next job id: %w", err) + } + job.ID = id + _, err = jqdb.coll.InsertOne(jqdb.ctx, job) if err != nil { return 0, fmt.Errorf("failed to insert job into mongo collection: %w", err) } @@ -117,5 +129,14 @@ func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { // DeleteJob(jobID uint64) error func (jqdb *JobQueueDbMongo[T]) DeleteJob(jobID uint64) error { + result := jqdb.coll.FindOneAndDelete(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) + if !errors.Is(result.Err(), mongo.ErrNoDocuments) { + return fmt.Errorf("failed to delete job from mongo collection: %w", result.Err()) + } return nil } + +func dbCollectionNameForQueue(queueName string) string { + // TODO: normalize queueName + return queueName + "_jobs" +} diff --git a/jobqueue_test.go b/jobqueue_test.go index 711a8c8..13790ef 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -1,6 +1,7 @@ package jobqueue import ( + "context" "fmt" "math/rand" "os" @@ -13,9 +14,10 @@ import ( "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" -) -const BadgerDBPath = "/tmp/badger" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) func init() { //nolint:gochecknoinits // for testing zerolog.SetGlobalLevel(zerolog.DebugLevel) @@ -52,7 +54,6 @@ func TestNewJobQueue(t *testing.T) { // Test cases testCases := []struct { name string - dbPath string queueName string workers int options []Option[testJob] @@ -61,7 +62,6 @@ func TestNewJobQueue(t *testing.T) { }{ { name: "Valid configuration", - dbPath: "/tmp/test_jobqueue_1", queueName: "test-queue-1", workers: 2, options: []Option[testJob]{WithInMemDB[testJob]()}, @@ -69,7 +69,6 @@ func TestNewJobQueue(t *testing.T) { }, { name: "Invalid workers count", - dbPath: "/tmp/test_jobqueue_2", queueName: "test-queue-2", workers: -1, options: []Option[testJob]{WithInMemDB[testJob]()}, @@ -77,7 +76,6 @@ func TestNewJobQueue(t *testing.T) { }, { name: "Zero workers", - dbPath: "/tmp/test_jobqueue_3", queueName: "test-queue-3", workers: 0, options: []Option[testJob]{WithInMemDB[testJob]()}, @@ -91,7 +89,7 @@ func TestNewJobQueue(t *testing.T) { t.Parallel() // Act - jq, err := New[testJob](tc.dbPath, tc.queueName, tc.workers, testJobHandler(), tc.options...) + jq, err := New[testJob](tc.queueName, tc.workers, testJobHandler(), tc.options...) // Assert if tc.expectedError { @@ -116,7 +114,7 @@ func TestNewJobQueue(t *testing.T) { func TestJobQueue_Enqueue(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInMemDB[testJob]()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { @@ -144,7 +142,7 @@ func TestJobQueue_Enqueue(t *testing.T) { func TestJobQueue_ProcessJob(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInMemDB[testJob]()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { @@ -190,7 +188,7 @@ func TestJobQueue_Recovery(t *testing.T) { cleanupBadgerDB(t) // Create initial job queue - jq, err := New[testJob]("/tmp/badger", "test-job", 0, testJobHandler()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) assert.NoError(t, err) t.Cleanup(func() { @@ -205,7 +203,7 @@ func TestJobQueue_Recovery(t *testing.T) { assert.NoError(t, jq.Stop()) // Create recovered job queue - recoveredJq, err := New[testJob]("/tmp/badger", "test-job", 0, testJobHandler()) + recoveredJq, err := New[testJob]("test-job", 0, testJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) assert.NoError(t, err) j := <-recoveredJq.jobs @@ -221,24 +219,29 @@ func TestJobQueue_Recovery(t *testing.T) { assert.NoError(t, recoveredJq.Stop()) } -func TestJobConcurrency(t *testing.T) { +func TestBadgerJobConcurrency(t *testing.T) { cleanupBadgerDB(t) - - // create initial job queue - - // mongo version - jq, err := New[testJob]("ignored", "test-job", 5, complexJobHandler(), - UseMongoDB[testJob]("mongodb://localhost:27017")) - - // badger version - //jq, err := New[testJob]("/tmp/badger", "test-job", 5, complexJobHandler()) - + jq, err := New[testJob]("test-job", 5, complexJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) assert.NoError(t, err) t.Cleanup(func() { assert.NoError(t, jq.Stop()) cleanupBadgerDB(t) }) + DoJobConcurrencyTest(jq, t) +} + +func TestMongoJobConcurrency(t *testing.T) { + cleanupMongoDB(t) + jq, err := New[testJob]("test-job", 5, complexJobHandler(), WithMongoDB[testJob]("mongodb://localhost:27017")) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, jq.Stop()) + cleanupMongoDB(t) + }) + DoJobConcurrencyTest(jq, t) +} +func DoJobConcurrencyTest(jq *JobQueue[testJob], t *testing.T) { // Queue a bunch of jobs, which should be processed concurrently ids := make([]uint64, 0) for i := 0; i < 20; i++ { @@ -272,14 +275,23 @@ func TestJobConcurrency(t *testing.T) { _, ok := jq.isJobIDInQueue.Load(uint64(id)) assert.False(t, ok) - // Check that the job is no longer in the badger DB + // Check that the job is no longer in the JobQueue DB job, err := jq.db.ReadJob(uint64(id)) - assert.Error(t, err, badger.ErrKeyNotFound) + assert.Error(t, err, ErrJobNotFound) assert.Nil(t, job) } } func cleanupBadgerDB(t *testing.T) { - assert.NoError(t, os.RemoveAll(BadgerDBPath)) + assert.NoError(t, os.RemoveAll("/tmp/badger")) +} + +func cleanupMongoDB(t *testing.T) { + path := "mongodb://localhost:27017" + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(path)) + assert.NoError(t, err) + db := client.Database("job_queues") + assert.NoError(t, db.Drop(context.TODO())) + assert.NoError(t, client.Disconnect(context.Background())) } diff --git a/options.go b/options.go index 5a7a115..0136281 100644 --- a/options.go +++ b/options.go @@ -23,6 +23,14 @@ func WithJobBufferSize[T any](size int) Option[T] { } } +// how many jobs at once are retrieved from the DB in a single fetch operation +func WithJobsPerFetch[T any](count int) Option[T] { + return func(jq *JobQueue[T]) { + jq.logger.Debug().Msg(fmt.Sprintf("Jobs per fetch set to %d", count)) + jq.jobsPerFetch = count + } +} + // WithInmemDB uses an in-memory BadgerDB instead of a persistent one. // Useful for testing, but provides no durability guarantees. // if we previously called UseMongoDB, we will warn and ignore this option. @@ -30,6 +38,8 @@ func WithInMemDB[T any]() Option[T] { return func(jq *JobQueue[T]) { if jq.dbUseMongo { jq.logger.Warn().Msg("Ignoring WithInMemDB option, not compatible with UseMongoDB option") + } else if jq.dbUseBadger { + jq.logger.Warn().Msg("Ignoring WithInMemDB option, not compatible with UseBadgerDB option") } else { jq.logger.Debug().Msg("Using Badger In-Memory DB for Job Queue DB") jq.dbInMemory = true @@ -37,25 +47,34 @@ func WithInMemDB[T any]() Option[T] { } } -// how many jobs at once are retrieved from the DB in a single fetch operation -func WithJobsPerFetch[T any](count int) Option[T] { - return func(jq *JobQueue[T]) { - jq.logger.Debug().Msg(fmt.Sprintf("Jobs per fetch set to %d", count)) - jq.jobsPerFetch = count - } -} - -// UseMongoDB sets the JobQueue to use MongoDB instead of BadgerDB. +// WithMongoDB sets the JobQueue to use MongoDB instead of BadgerDB. // if WithInMemDB was previously called, we will warn and ignore this option. -func UseMongoDB[T any](uri string) Option[T] { +func WithMongoDB[T any](uri string) Option[T] { return func(jq *JobQueue[T]) { if jq.dbInMemory { - jq.logger.Warn().Msg("Ignoring UseMongoDB option, not compatible with WithInMemDB option") + jq.logger.Warn().Msg("Ignoring WithMongoDB option, not compatible with WithInMemDB option") + } else if jq.dbUseBadger { + jq.logger.Warn().Msg("Ignoring WitMongoDB option, not compatible with WithBadgerDB option") } else { jq.logger.Debug().Msg(fmt.Sprintf("Using Mongo DB at %s for Job Queue DB", uri)) - jq.dbInMemory = false jq.dbPath = uri jq.dbUseMongo = true } } } + +// WithBadgerDB sets the JobQueue to use BadgerDB instead of MongoDB. +// if WithInMemDB or WithBadgerDB was previously called, we will warn and ignore this option. +func WithBadgerDB[T any](path string) Option[T] { + return func(jq *JobQueue[T]) { + if jq.dbInMemory { + jq.logger.Warn().Msg("Ignoring WithBadgerDB option, not compatible with WithInMemDB option") + } else if jq.dbUseMongo { + jq.logger.Warn().Msg("Ignoring WithBadgerDB option, not compatible with WithMongoDB option") + } else { + jq.logger.Debug().Msg(fmt.Sprintf("Using Badger DB at %s for Job Queue DB", path)) + jq.dbPath = path + jq.dbUseBadger = true + } + } +} From 2eb33ff00cd098c70d6256ca20f8a93ee134c0c7 Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Sun, 18 Aug 2024 10:10:33 -0400 Subject: [PATCH 15/18] Mongo JobQueueDB is working IMPORTANT: this still doesn't fully support distributed concurrency, because multiple instances will fetch the same jobs. Still need to update the job records to indicate they are assigned to a job queue processor. --- jobqueue_db_mongo.go | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/jobqueue_db_mongo.go b/jobqueue_db_mongo.go index d8f0ecf..8c4a09c 100644 --- a/jobqueue_db_mongo.go +++ b/jobqueue_db_mongo.go @@ -81,19 +81,38 @@ func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error) { } else { return 0, fmt.Errorf("failed to get next job id: %w", result.Err()) } + } else { + raw, err := result.Raw() + if err != nil { + return 0, fmt.Errorf("failed to get raw result from mongo: %w", err) + } + val := raw.Lookup("next_job_id") + nextJobId = uint64(val.AsInt64()) } - raw, err := result.Raw() - if err != nil { - return 0, fmt.Errorf("failed to get raw result from mongo: %w", err) - } - val := raw.Lookup("next_job_id") - nextJobId = uint64(val.AsInt64()) return nextJobId, nil } // FetchJobs(count int) ([]*job[T], error) func (jqdb *JobQueueDbMongo[T]) FetchJobs(count int) ([]*job[T], error) { - return nil, nil + // create a new array of jobs + jobs := make([]*job[T], 0, count) + + opts := options.Find().SetLimit(int64(count)).SetAllowPartialResults(true) + cursor, err := jqdb.coll.Find(jqdb.ctx, bson.D{}, opts) + if err != nil { + return nil, fmt.Errorf("failed to fetch jobs from mongo collection: %w", err) + } + defer cursor.Close(jqdb.ctx) + for cursor.Next(jqdb.ctx) { + var j job[T] + err := cursor.Decode(&j) + if err != nil { + continue // skip this job + } else { + jobs = append(jobs, &j) + } + } + return jobs, nil } // ReadJob(jobID uint64) (*job[T], error) @@ -129,9 +148,9 @@ func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { // DeleteJob(jobID uint64) error func (jqdb *JobQueueDbMongo[T]) DeleteJob(jobID uint64) error { - result := jqdb.coll.FindOneAndDelete(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) - if !errors.Is(result.Err(), mongo.ErrNoDocuments) { - return fmt.Errorf("failed to delete job from mongo collection: %w", result.Err()) + _, err := jqdb.coll.DeleteOne(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) + if err != nil { + return fmt.Errorf("failed to delete job from mongo collection: %w", err) } return nil } From 3e31f6b45376f97968c3a1a73bdf1abed14fd0ff Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Sun, 18 Aug 2024 10:18:08 -0400 Subject: [PATCH 16/18] Update timestat.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- timestat.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/timestat.go b/timestat.go index f156d5c..83fa8a3 100644 --- a/timestat.go +++ b/timestat.go @@ -38,5 +38,9 @@ func (ts *TimeStat) RecordTime(duration time.Duration) { } func (ts *TimeStat) String() string { - return "tot " + ts.TotalTime.String() + " cnt " + strconv.FormatInt(ts.Count, 10) + " avg " + ts.AvgTime().String() + " min " + ts.MinTime.String() + " max " + ts.MaxTime.String() + return "tot " + ts.TotalTime.String() + + " cnt " + strconv.FormatInt(ts.Count, 10) + + " avg " + ts.AvgTime().String() + + " min " + ts.MinTime.String() + + " max " + ts.MaxTime.String() } From d35a9cbb34b8d375551dcf244f1699aea91b497e Mon Sep 17 00:00:00 2001 From: Ed Zavada Date: Sun, 18 Aug 2024 10:24:13 -0400 Subject: [PATCH 17/18] Update jobqueue_db_badger.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- jobqueue_db_badger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go index 902fefd..2669aa6 100644 --- a/jobqueue_db_badger.go +++ b/jobqueue_db_badger.go @@ -128,7 +128,7 @@ func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { return err }) if err != nil { - if err == badger.ErrKeyNotFound { + if errors.Is(err, badger.ErrKeyNotFound) { return nil, ErrJobNotFound } return nil, fmt.Errorf("failed to read job: %w", err) From 8c89544aa046604eb4488bc8cd6f1cf541a39c7e Mon Sep 17 00:00:00 2001 From: edmund-zavada <154269028+edmund-zavada@users.noreply.github.com> Date: Sun, 18 Aug 2024 14:31:28 -0400 Subject: [PATCH 18/18] Update jobqueue_db_badger.go fix issue introduced from commit of bot advice --- jobqueue_db_badger.go | 1 + 1 file changed, 1 insertion(+) diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go index 2669aa6..7e3bf95 100644 --- a/jobqueue_db_badger.go +++ b/jobqueue_db_badger.go @@ -2,6 +2,7 @@ package jobqueue import ( "encoding/binary" + "errors" "fmt" "github.com/dgraph-io/badger/v4"