From bf6097908a2c89c5484a104073aaf2ef1057d2ff Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Thu, 16 Apr 2026 14:35:06 -0400 Subject: [PATCH 01/12] batch job queue: adds initial empty implementation --- nomad/queues/batch_job_queue.go | 275 +++++++++++++++++++++++++++ nomad/queues/batch_job_queue_test.go | 101 ++++++++++ nomad/queues/interface.go | 7 + nomad/queues/priority_queue.go | 42 ++++ 4 files changed, 425 insertions(+) create mode 100644 nomad/queues/batch_job_queue.go create mode 100644 nomad/queues/batch_job_queue_test.go create mode 100644 nomad/queues/interface.go create mode 100644 nomad/queues/priority_queue.go diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go new file mode 100644 index 00000000000..82fc48be938 --- /dev/null +++ b/nomad/queues/batch_job_queue.go @@ -0,0 +1,275 @@ +package queues + +import ( + "container/heap" + "context" + "errors" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ErrWatchedEvalNotFound = errors.New("watched evaluation not found") + +type TenantID string + +type DynamicPriorityQueue struct { + // tenants is used to keep track of cluster usage for this queue. + // When workloads are placed or the configured interval is passed, + // cluster usage is updated for the workloads of each tenant. + tenants map[TenantID]Tenant + + // queue is the main datastructure that contains all pending workloads + queue WorkloadQueue + + // qMux locks the queue during concurrent access + qMux *sync.Mutex + + // qNotify allows for notifying the consumer that workloads + // have been added to the queue + qNotify chan struct{} + + // enqueueCh is used to buffer workloads before they + // are processed by the manager and pushed onto the queue + enqueueCh chan *Workload + + // totalUsage is the sum of all tenant usages + totalUsage int + + // conf contains user configurations for tuning the behavior of the queue + conf *DynamicPriorityConfig + + // evalBroker is the injected broker for passing an evaluation + // on to be scheduled by Nomad + evalBroker Queue + + // state is the in-memory state store used for both reconciling tenant + // workload usages, and polling submitted evaluations for placement + state *state.StateStore + logger hclog.Logger +} + +type DynamicPriorityConfig struct { + TenantType string + MetadataKey string + CalcInterval time.Duration +} + +type Tenant struct { + tid TenantID + workloads map[string]*Workload + usage int +} + +type Workload struct { + id string + tid TenantID + priority int + eval *structs.Evaluation + size int + index int +} + +func (w *Workload) calculatePriority(_ int64) { + // unimplemented +} + +func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue { + return &DynamicPriorityQueue{ + tenants: map[TenantID]Tenant{}, + queue: WorkloadQueue{}, + state: state, + enqueueCh: make(chan *Workload, 8096), + evalBroker: broker, + qMux: &sync.Mutex{}, + qNotify: make(chan struct{}, 1), + conf: conf, + logger: logger.Named("Dynamic Priority Queue"), + } +} + +func (d *DynamicPriorityQueue) Start(ctx context.Context) { + // rebuild internal state from statestore, unimplemented + + go d.runManager(ctx) + go d.runConsumer(ctx) +} + +// Enqueue is used to produce a message on the queue by taking +func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { + w := d.generateWorkload(e) + // in the event of an empty workload, just pass eval to eval broker + if w == nil { + d.evalBroker.Enqueue(e) + return + } + + d.enqueueCh <- w +} + +// produce pushes workloads onto the queue and notifies the consumer +// goroutine. It also updates priorities on the configured interval. +func (d *DynamicPriorityQueue) runManager(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case w := <-d.enqueueCh: + w.calculatePriority(w.eval.CreateTime) + + d.qMux.Lock() + heap.Push(&d.queue, w) + d.qMux.Unlock() + + // Notify Workload processor of new workload + select { + case d.qNotify <- struct{}{}: + default: + } + case <-time.After(d.conf.CalcInterval): + d.qMux.Lock() + d.calculatePriorities(time.Now().UnixNano()) + heap.Init(&d.queue) // priorities were updated, reinit + d.qMux.Unlock() + } + } +} + +func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-d.qNotify: + + // Pop a workload off the queue if available + d.qMux.Lock() + workload := heap.Pop(&d.queue).(*Workload) + d.qMux.Unlock() + + // Give the eval to the eval broker + d.evalBroker.Enqueue(workload.eval) + + // Wait for the eval to be placed + d.waitForPlacement(ctx, workload.eval) + + d.qMux.Lock() + l := d.queue.Len() + d.qMux.Unlock() + + // If the queue still has work, notify it + // to continue. + if l > 0 { + select { + case d.qNotify <- struct{}{}: + default: + } + } + } + } +} + +// generateWorkload is used to create an initial workload from a given evaluation. +// It creates the tenantID from the queues config which is +func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload { + job, err := d.state.JobByID(nil, e.Namespace, e.JobID) + if err != nil { + return nil + } + + tid := "" + switch d.conf.TenantType { + case "namespace": + tid = job.Namespace + case "metadata": + tenantID, ok := job.Meta[d.conf.MetadataKey] + if !ok { + return nil + } + tid = tenantID + default: + d.logger.Error("unknown tenant type, this is a bug.") + return nil + } + + return &Workload{ + tid: TenantID(tid), + priority: 0, + eval: e, + size: 0, + } +} + +func (d *DynamicPriorityQueue) calculatePriorities(time int64) { + // Decay tenant workload usages first + for _, tenant := range d.tenants { + for range tenant.workloads { + // Unimplemented + d.totalUsage -= 0 + tenant.usage -= 0 + } + } + + // Now that we have accurate tenant usage, calculate + // each workloads new priority + for _, workload := range d.queue { + workload.calculatePriority(time) + } +} + +// waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals +// have been marked terminal, indicating the workload has been scheduled. +// +// TODO: search codebase to see if there is already an established way to do this. +func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation) error { + for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { + id := eval.ID + eval.TerminalStatus() + + if eval.BlockedEval != "" { + id = eval.BlockedEval + } else if eval.NextEval != "" { + id = eval.NextEval + } + + snap, err := d.state.Snapshot() + if err != nil { + return err + } + + ws := memdb.NewWatchSet() + eval, err = snap.EvalByID(ws, id) + if err != nil { + return err + } + if eval == nil { + return ErrWatchedEvalNotFound + } + + // Wait for the eval to be marked complete or context to cancel. + // This keeps the loop from spinning. + for !eval.TerminalStatus() { + if err := ws.WatchCtx(ctx); err != nil { + return err + } + snap, err = d.state.Snapshot() + if err != nil { + return err + } + ws = memdb.NewWatchSet() + eval, err = snap.EvalByID(ws, eval.ID) + if err != nil { + return err + } + if eval == nil { + return ErrWatchedEvalNotFound + } + } + } + + return nil +} diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go new file mode 100644 index 00000000000..8e5f7ab0efa --- /dev/null +++ b/nomad/queues/batch_job_queue_test.go @@ -0,0 +1,101 @@ +package queues + +import ( + "testing" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" +) + +func TestWorkloadPriority(t *testing.T) {} + +func TestWaitForPlacement(t *testing.T) { + + t.Run("returns if eval complete", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) + + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval) + doneCh <- err + }() + + testEval.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{testEval}) + + done := <-doneCh + + must.NoError(t, done) + }) + + t.Run("continues watching blocked evals", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + blocked := mock.Eval() + + testEval.Status = structs.EvalStatusComplete + testEval.BlockedEval = blocked.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, blocked}) + + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval) + doneCh <- err + }() + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + blocked.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{blocked}) + + done := <-doneCh + must.NoError(t, done) + + }) + + t.Run("continues watching next evals after eval failure", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + next := mock.Eval() + + testEval.Status = structs.EvalStatusFailed + testEval.NextEval = next.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, next}) + + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval) + doneCh <- err + }() + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + next.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{next}) + + done := <-doneCh + must.NoError(t, done) + + }) +} diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go new file mode 100644 index 00000000000..3ecd575cbd1 --- /dev/null +++ b/nomad/queues/interface.go @@ -0,0 +1,7 @@ +package queues + +import "github.com/hashicorp/nomad/nomad/structs" + +type Queue interface { + Enqueue(*structs.Evaluation) +} diff --git a/nomad/queues/priority_queue.go b/nomad/queues/priority_queue.go new file mode 100644 index 00000000000..feaedda0b7f --- /dev/null +++ b/nomad/queues/priority_queue.go @@ -0,0 +1,42 @@ +package queues + +import "container/heap" + +// A QorkloadQueue implements heap.Interface and holds Items. +type WorkloadQueue []*Workload + +func (pq WorkloadQueue) Len() int { return len(pq) } + +func (pq WorkloadQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].priority > pq[j].priority +} + +func (pq WorkloadQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *WorkloadQueue) Push(x any) { + n := len(*pq) + item := x.(*Workload) + item.index = n + *pq = append(*pq, item) +} + +func (pq *WorkloadQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *WorkloadQueue) update(item *Workload, priority int) { + item.priority = priority + heap.Fix(pq, item.index) +} From ac69bb67a459bbe1b2bc58626bfca2b402815f02 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 15:47:54 -0400 Subject: [PATCH 02/12] rebase and refactor method name --- nomad/queues/batch_job_queue.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 82fc48be938..9b15ea3ccd0 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -95,7 +95,7 @@ func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *Dynami func (d *DynamicPriorityQueue) Start(ctx context.Context) { // rebuild internal state from statestore, unimplemented - go d.runManager(ctx) + go d.runProducer(ctx) go d.runConsumer(ctx) } @@ -111,9 +111,9 @@ func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { d.enqueueCh <- w } -// produce pushes workloads onto the queue and notifies the consumer +// runProducer pushes workloads onto the queue and notifies the consumer // goroutine. It also updates priorities on the configured interval. -func (d *DynamicPriorityQueue) runManager(ctx context.Context) { +func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { for { select { case <-ctx.Done(): @@ -228,7 +228,6 @@ func (d *DynamicPriorityQueue) calculatePriorities(time int64) { func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation) error { for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { id := eval.ID - eval.TerminalStatus() if eval.BlockedEval != "" { id = eval.BlockedEval From 4c1f68126e454b6b6f29527ee9328b12ee197f37 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 15:55:32 -0400 Subject: [PATCH 03/12] add headers and ci testing --- ci/test-core.json | 3 ++- nomad/queues/batch_job_queue.go | 3 +++ nomad/queues/batch_job_queue_test.go | 3 +++ nomad/queues/interface.go | 3 +++ nomad/queues/priority_queue.go | 3 +++ 5 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ci/test-core.json b/ci/test-core.json index 4756a5ed233..be94013fffa 100644 --- a/ci/test-core.json +++ b/ci/test-core.json @@ -36,9 +36,10 @@ "nomad/auth/...", "nomad/deploymentwatcher/...", "nomad/drainer/...", - "nomad/reporting/...", "nomad/lock/...", "nomad/peers/...", + "nomad/queues/...", + "nomad/reporting/...", "nomad/state/...", "nomad/stream/...", "nomad/structs/...", diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 9b15ea3ccd0..fb3cc8a56bf 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package queues import ( diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go index 8e5f7ab0efa..4629bfcaee6 100644 --- a/nomad/queues/batch_job_queue_test.go +++ b/nomad/queues/batch_job_queue_test.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package queues import ( diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go index 3ecd575cbd1..0d2d160792e 100644 --- a/nomad/queues/interface.go +++ b/nomad/queues/interface.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package queues import "github.com/hashicorp/nomad/nomad/structs" diff --git a/nomad/queues/priority_queue.go b/nomad/queues/priority_queue.go index feaedda0b7f..56bbcba4dac 100644 --- a/nomad/queues/priority_queue.go +++ b/nomad/queues/priority_queue.go @@ -1,3 +1,6 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + package queues import "container/heap" From 4d545280281eb6d88eaba9d729535bbcf61355c5 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Mon, 4 May 2026 16:38:13 -0400 Subject: [PATCH 04/12] fix comments and add some godocs --- nomad/queues/batch_job_queue.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index fb3cc8a56bf..41ac5bd39e1 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -102,7 +102,10 @@ func (d *DynamicPriorityQueue) Start(ctx context.Context) { go d.runConsumer(ctx) } -// Enqueue is used to produce a message on the queue by taking +// Enqueue is the method used to put evaluations on the queue. +// It generates a workload with an empty priority, appends it +// to an internal channel to be processed and added to the actual +// heap container. func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { w := d.generateWorkload(e) // in the event of an empty workload, just pass eval to eval broker @@ -136,12 +139,15 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { case <-time.After(d.conf.CalcInterval): d.qMux.Lock() d.calculatePriorities(time.Now().UnixNano()) - heap.Init(&d.queue) // priorities were updated, reinit + heap.Init(&d.queue) d.qMux.Unlock() } } } +// runConsumer pops the highest priority workloads off the queue one +// at a time, enqueues them onto the Eval Broker, and waits for them +// to be placed before continuing. func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { for { select { @@ -164,7 +170,7 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { l := d.queue.Len() d.qMux.Unlock() - // If the queue still has work, notify it + // If the queue still has work, notify self // to continue. if l > 0 { select { @@ -176,8 +182,7 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { } } -// generateWorkload is used to create an initial workload from a given evaluation. -// It creates the tenantID from the queues config which is +// generateWorkload is used to create an initial workload from a given evaluation func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload { job, err := d.state.JobByID(nil, e.Namespace, e.JobID) if err != nil { @@ -208,7 +213,8 @@ func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload } func (d *DynamicPriorityQueue) calculatePriorities(time int64) { - // Decay tenant workload usages first + // Decay tenant workload usages first, because a workload's + // priority relies on its tenant's usage. for _, tenant := range d.tenants { for range tenant.workloads { // Unimplemented @@ -227,7 +233,7 @@ func (d *DynamicPriorityQueue) calculatePriorities(time int64) { // waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals // have been marked terminal, indicating the workload has been scheduled. // -// TODO: search codebase to see if there is already an established way to do this. +// TODO (mismithhisler): is there a better way to do this? func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation) error { for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { id := eval.ID From 053d30d45737fb724f282d989a14d50643a239a8 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 11:25:16 -0400 Subject: [PATCH 05/12] Apply suggestions from code review Co-authored-by: Tim Gross --- nomad/queues/batch_job_queue.go | 7 ++++++- nomad/queues/priority_queue.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 41ac5bd39e1..9aeb0710489 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -30,7 +30,7 @@ type DynamicPriorityQueue struct { queue WorkloadQueue // qMux locks the queue during concurrent access - qMux *sync.Mutex + qMux sync.Mutex // qNotify allows for notifying the consumer that workloads // have been added to the queue @@ -249,7 +249,12 @@ func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *struc return err } + abandonCh := state.AbandonCh() ws := memdb.NewWatchSet() + ws.Add(abandonCh) + if err = ws.WatchCtx(ctx); err != nil { + return err + } eval, err = snap.EvalByID(ws, id) if err != nil { return err diff --git a/nomad/queues/priority_queue.go b/nomad/queues/priority_queue.go index 56bbcba4dac..fee398124af 100644 --- a/nomad/queues/priority_queue.go +++ b/nomad/queues/priority_queue.go @@ -5,7 +5,7 @@ package queues import "container/heap" -// A QorkloadQueue implements heap.Interface and holds Items. +// A WorkloadQueue implements heap.Interface and holds *Workload. type WorkloadQueue []*Workload func (pq WorkloadQueue) Len() int { return len(pq) } From ebe1ec216bac71aa42e714b6cd33d09a1e8ad459 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 14:44:30 -0400 Subject: [PATCH 06/12] refactor waitForPlacement and update tests --- nomad/queues/batch_job_queue.go | 40 +++++++++----------------- nomad/queues/batch_job_queue_test.go | 43 +++++++++++++++++++++++----- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 9aeb0710489..7ec77d90ea6 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -88,7 +88,7 @@ func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *Dynami state: state, enqueueCh: make(chan *Workload, 8096), evalBroker: broker, - qMux: &sync.Mutex{}, + qMux: sync.Mutex{}, qNotify: make(chan struct{}, 1), conf: conf, logger: logger.Named("Dynamic Priority Queue"), @@ -164,7 +164,7 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { d.evalBroker.Enqueue(workload.eval) // Wait for the eval to be placed - d.waitForPlacement(ctx, workload.eval) + d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet()) d.qMux.Lock() l := d.queue.Len() @@ -234,7 +234,7 @@ func (d *DynamicPriorityQueue) calculatePriorities(time int64) { // have been marked terminal, indicating the workload has been scheduled. // // TODO (mismithhisler): is there a better way to do this? -func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation) error { +func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation, ws memdb.WatchSet) error { for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { id := eval.ID @@ -248,13 +248,12 @@ func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *struc if err != nil { return err } + // ws := memdb.NewWatchSet() - abandonCh := state.AbandonCh() - ws := memdb.NewWatchSet() + // TODO: handle snapshot restores + abandonCh := snap.AbandonCh() ws.Add(abandonCh) - if err = ws.WatchCtx(ctx); err != nil { - return err - } + eval, err = snap.EvalByID(ws, id) if err != nil { return err @@ -263,24 +262,13 @@ func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *struc return ErrWatchedEvalNotFound } - // Wait for the eval to be marked complete or context to cancel. - // This keeps the loop from spinning. - for !eval.TerminalStatus() { - if err := ws.WatchCtx(ctx); err != nil { - return err - } - snap, err = d.state.Snapshot() - if err != nil { - return err - } - ws = memdb.NewWatchSet() - eval, err = snap.EvalByID(ws, eval.ID) - if err != nil { - return err - } - if eval == nil { - return ErrWatchedEvalNotFound - } + if eval.TerminalStatus() { + continue + } + + // If the latest version of the eval isn't terminal, wait for an update + if err = ws.WatchCtx(ctx); err != nil { + return err } } diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go index 4629bfcaee6..44e59392a49 100644 --- a/nomad/queues/batch_job_queue_test.go +++ b/nomad/queues/batch_job_queue_test.go @@ -4,17 +4,19 @@ package queues import ( + "fmt" "testing" + "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" ) -func TestWorkloadPriority(t *testing.T) {} - func TestWaitForPlacement(t *testing.T) { t.Run("returns if eval complete", func(t *testing.T) { @@ -24,9 +26,10 @@ func TestWaitForPlacement(t *testing.T) { testEval := mock.Eval() ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) + ws := memdb.NewWatchSet() doneCh := make(chan error) go func() { - err := testQueue.waitForPlacement(t.Context(), testEval) + err := testQueue.waitForPlacement(t.Context(), testEval, ws) doneCh <- err }() @@ -50,12 +53,26 @@ func TestWaitForPlacement(t *testing.T) { ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, blocked}) + ws := memdb.NewWatchSet() doneCh := make(chan error) go func() { - err := testQueue.waitForPlacement(t.Context(), testEval) + err := testQueue.waitForPlacement(t.Context(), testEval, ws) doneCh <- err }() + // We've want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + select { case <-doneCh: t.Fatal("should not have exited") @@ -67,7 +84,6 @@ func TestWaitForPlacement(t *testing.T) { done := <-doneCh must.NoError(t, done) - }) t.Run("continues watching next evals after eval failure", func(t *testing.T) { @@ -82,12 +98,26 @@ func TestWaitForPlacement(t *testing.T) { ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, next}) + ws := memdb.NewWatchSet() doneCh := make(chan error) go func() { - err := testQueue.waitForPlacement(t.Context(), testEval) + err := testQueue.waitForPlacement(t.Context(), testEval, ws) doneCh <- err }() + // We've want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + select { case <-doneCh: t.Fatal("should not have exited") @@ -99,6 +129,5 @@ func TestWaitForPlacement(t *testing.T) { done := <-doneCh must.NoError(t, done) - }) } From 8b92e89ceb4407a7fb715b41aac1eb3b26c0f3fb Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 14:52:05 -0400 Subject: [PATCH 07/12] remove old watches --- nomad/queues/batch_job_queue.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 7ec77d90ea6..7ecf9cc865b 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -7,6 +7,7 @@ import ( "container/heap" "context" "errors" + "maps" "sync" "time" @@ -248,7 +249,6 @@ func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *struc if err != nil { return err } - // ws := memdb.NewWatchSet() // TODO: handle snapshot restores abandonCh := snap.AbandonCh() @@ -270,6 +270,12 @@ func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *struc if err = ws.WatchCtx(ctx); err != nil { return err } + + // The watch channel will be closed, we should delete it to + // prevent immediately firing on the next WatchCtx + for k := range ws { + delete(ws, k) + } } return nil From 06e5087c65addfe63d3eaa390f7b077f41ab32c9 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 14:54:56 -0400 Subject: [PATCH 08/12] fix import --- nomad/queues/batch_job_queue.go | 1 - 1 file changed, 1 deletion(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 7ecf9cc865b..a088c8f9f4f 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -7,7 +7,6 @@ import ( "container/heap" "context" "errors" - "maps" "sync" "time" From 113996361ad8b76c60c9c93082b40bbbf5b9038e Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 15:44:54 -0400 Subject: [PATCH 09/12] fix comments --- nomad/queues/batch_job_queue.go | 2 -- nomad/queues/batch_job_queue_test.go | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index a088c8f9f4f..39ed27afc9a 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -232,8 +232,6 @@ func (d *DynamicPriorityQueue) calculatePriorities(time int64) { // waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals // have been marked terminal, indicating the workload has been scheduled. -// -// TODO (mismithhisler): is there a better way to do this? func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation, ws memdb.WatchSet) error { for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { id := eval.ID diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go index 44e59392a49..6f139419be2 100644 --- a/nomad/queues/batch_job_queue_test.go +++ b/nomad/queues/batch_job_queue_test.go @@ -60,7 +60,7 @@ func TestWaitForPlacement(t *testing.T) { doneCh <- err }() - // We've want to make sure the testQueue has begun a watch on the blocked eval + // We want to make sure the testQueue has begun a watch on the blocked eval // before continuing, which is indicated by the length of the watchset being >0. must.Wait(t, wait.InitialSuccess( wait.ErrorFunc(func() error { @@ -105,7 +105,7 @@ func TestWaitForPlacement(t *testing.T) { doneCh <- err }() - // We've want to make sure the testQueue has begun a watch on the blocked eval + // We want to make sure the testQueue has begun a watch on the blocked eval // before continuing, which is indicated by the length of the watchset being >0. must.Wait(t, wait.InitialSuccess( wait.ErrorFunc(func() error { From 175dc120908591ac0dc7306bd711fa0728a4c150 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 15:46:29 -0400 Subject: [PATCH 10/12] more comment undates --- nomad/queues/batch_job_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 39ed27afc9a..e81a09459c3 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -131,7 +131,7 @@ func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { heap.Push(&d.queue, w) d.qMux.Unlock() - // Notify Workload processor of new workload + // Notify Workload consumer of new workload select { case d.qNotify <- struct{}{}: default: From 2e078d62751338457ca384f611646196f592c627 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Tue, 5 May 2026 18:15:37 -0400 Subject: [PATCH 11/12] check waitForPlacement err --- nomad/queues/batch_job_queue.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index e81a09459c3..0c25f01056f 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -7,6 +7,7 @@ import ( "container/heap" "context" "errors" + "os" "sync" "time" @@ -164,7 +165,10 @@ func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { d.evalBroker.Enqueue(workload.eval) // Wait for the eval to be placed - d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet()) + err := d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet()) + if err != nil { + d.logger.Error("failure waiting for workload placement", "evalID", workload.eval) + } d.qMux.Lock() l := d.queue.Len() From d06ff35d0c894a36fb92991ed8ed3c3bf8f41fc8 Mon Sep 17 00:00:00 2001 From: Michael Smithhisler Date: Wed, 6 May 2026 09:17:29 -0400 Subject: [PATCH 12/12] fix imports --- nomad/queues/batch_job_queue.go | 1 - 1 file changed, 1 deletion(-) diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go index 0c25f01056f..f2053012843 100644 --- a/nomad/queues/batch_job_queue.go +++ b/nomad/queues/batch_job_queue.go @@ -7,7 +7,6 @@ import ( "container/heap" "context" "errors" - "os" "sync" "time"