diff --git a/ci/test-core.json b/ci/test-core.json index 4756a5ed233..be94013fffa 100644 --- a/ci/test-core.json +++ b/ci/test-core.json @@ -36,9 +36,10 @@ "nomad/auth/...", "nomad/deploymentwatcher/...", "nomad/drainer/...", - "nomad/reporting/...", "nomad/lock/...", "nomad/peers/...", + "nomad/queues/...", + "nomad/reporting/...", "nomad/state/...", "nomad/stream/...", "nomad/structs/...", diff --git a/nomad/queues/batch_job_queue.go b/nomad/queues/batch_job_queue.go new file mode 100644 index 00000000000..f2053012843 --- /dev/null +++ b/nomad/queues/batch_job_queue.go @@ -0,0 +1,282 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "container/heap" + "context" + "errors" + "sync" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ErrWatchedEvalNotFound = errors.New("watched evaluation not found") + +type TenantID string + +type DynamicPriorityQueue struct { + // tenants is used to keep track of cluster usage for this queue. + // When workloads are placed or the configured interval is passed, + // cluster usage is updated for the workloads of each tenant. + tenants map[TenantID]Tenant + + // queue is the main datastructure that contains all pending workloads + queue WorkloadQueue + + // qMux locks the queue during concurrent access + qMux sync.Mutex + + // qNotify allows for notifying the consumer that workloads + // have been added to the queue + qNotify chan struct{} + + // enqueueCh is used to buffer workloads before they + // are processed by the manager and pushed onto the queue + enqueueCh chan *Workload + + // totalUsage is the sum of all tenant usages + totalUsage int + + // conf contains user configurations for tuning the behavior of the queue + conf *DynamicPriorityConfig + + // evalBroker is the injected broker for passing an evaluation + // on to be scheduled by Nomad + evalBroker Queue + + // state is the in-memory state store used for both reconciling tenant + // workload usages, and polling submitted evaluations for placement + state *state.StateStore + logger hclog.Logger +} + +type DynamicPriorityConfig struct { + TenantType string + MetadataKey string + CalcInterval time.Duration +} + +type Tenant struct { + tid TenantID + workloads map[string]*Workload + usage int +} + +type Workload struct { + id string + tid TenantID + priority int + eval *structs.Evaluation + size int + index int +} + +func (w *Workload) calculatePriority(_ int64) { + // unimplemented +} + +func NewDynamicPriorityQueue(state *state.StateStore, broker Queue, conf *DynamicPriorityConfig, logger hclog.Logger) *DynamicPriorityQueue { + return &DynamicPriorityQueue{ + tenants: map[TenantID]Tenant{}, + queue: WorkloadQueue{}, + state: state, + enqueueCh: make(chan *Workload, 8096), + evalBroker: broker, + qMux: sync.Mutex{}, + qNotify: make(chan struct{}, 1), + conf: conf, + logger: logger.Named("Dynamic Priority Queue"), + } +} + +func (d *DynamicPriorityQueue) Start(ctx context.Context) { + // rebuild internal state from statestore, unimplemented + + go d.runProducer(ctx) + go d.runConsumer(ctx) +} + +// Enqueue is the method used to put evaluations on the queue. +// It generates a workload with an empty priority, appends it +// to an internal channel to be processed and added to the actual +// heap container. +func (d *DynamicPriorityQueue) Enqueue(e *structs.Evaluation) { + w := d.generateWorkload(e) + // in the event of an empty workload, just pass eval to eval broker + if w == nil { + d.evalBroker.Enqueue(e) + return + } + + d.enqueueCh <- w +} + +// runProducer pushes workloads onto the queue and notifies the consumer +// goroutine. It also updates priorities on the configured interval. +func (d *DynamicPriorityQueue) runProducer(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case w := <-d.enqueueCh: + w.calculatePriority(w.eval.CreateTime) + + d.qMux.Lock() + heap.Push(&d.queue, w) + d.qMux.Unlock() + + // Notify Workload consumer of new workload + select { + case d.qNotify <- struct{}{}: + default: + } + case <-time.After(d.conf.CalcInterval): + d.qMux.Lock() + d.calculatePriorities(time.Now().UnixNano()) + heap.Init(&d.queue) + d.qMux.Unlock() + } + } +} + +// runConsumer pops the highest priority workloads off the queue one +// at a time, enqueues them onto the Eval Broker, and waits for them +// to be placed before continuing. +func (d *DynamicPriorityQueue) runConsumer(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-d.qNotify: + + // Pop a workload off the queue if available + d.qMux.Lock() + workload := heap.Pop(&d.queue).(*Workload) + d.qMux.Unlock() + + // Give the eval to the eval broker + d.evalBroker.Enqueue(workload.eval) + + // Wait for the eval to be placed + err := d.waitForPlacement(ctx, workload.eval, memdb.NewWatchSet()) + if err != nil { + d.logger.Error("failure waiting for workload placement", "evalID", workload.eval) + } + + d.qMux.Lock() + l := d.queue.Len() + d.qMux.Unlock() + + // If the queue still has work, notify self + // to continue. + if l > 0 { + select { + case d.qNotify <- struct{}{}: + default: + } + } + } + } +} + +// generateWorkload is used to create an initial workload from a given evaluation +func (d *DynamicPriorityQueue) generateWorkload(e *structs.Evaluation) *Workload { + job, err := d.state.JobByID(nil, e.Namespace, e.JobID) + if err != nil { + return nil + } + + tid := "" + switch d.conf.TenantType { + case "namespace": + tid = job.Namespace + case "metadata": + tenantID, ok := job.Meta[d.conf.MetadataKey] + if !ok { + return nil + } + tid = tenantID + default: + d.logger.Error("unknown tenant type, this is a bug.") + return nil + } + + return &Workload{ + tid: TenantID(tid), + priority: 0, + eval: e, + size: 0, + } +} + +func (d *DynamicPriorityQueue) calculatePriorities(time int64) { + // Decay tenant workload usages first, because a workload's + // priority relies on its tenant's usage. + for _, tenant := range d.tenants { + for range tenant.workloads { + // Unimplemented + d.totalUsage -= 0 + tenant.usage -= 0 + } + } + + // Now that we have accurate tenant usage, calculate + // each workloads new priority + for _, workload := range d.queue { + workload.calculatePriority(time) + } +} + +// waitForPlacement follows a given evalutation in the state store until it, or it's nexted/blocked evals +// have been marked terminal, indicating the workload has been scheduled. +func (d *DynamicPriorityQueue) waitForPlacement(ctx context.Context, eval *structs.Evaluation, ws memdb.WatchSet) error { + for !eval.TerminalStatus() || eval.BlockedEval != "" || eval.NextEval != "" { + id := eval.ID + + if eval.BlockedEval != "" { + id = eval.BlockedEval + } else if eval.NextEval != "" { + id = eval.NextEval + } + + snap, err := d.state.Snapshot() + if err != nil { + return err + } + + // TODO: handle snapshot restores + abandonCh := snap.AbandonCh() + ws.Add(abandonCh) + + eval, err = snap.EvalByID(ws, id) + if err != nil { + return err + } + if eval == nil { + return ErrWatchedEvalNotFound + } + + if eval.TerminalStatus() { + continue + } + + // If the latest version of the eval isn't terminal, wait for an update + if err = ws.WatchCtx(ctx); err != nil { + return err + } + + // The watch channel will be closed, we should delete it to + // prevent immediately firing on the next WatchCtx + for k := range ws { + delete(ws, k) + } + } + + return nil +} diff --git a/nomad/queues/batch_job_queue_test.go b/nomad/queues/batch_job_queue_test.go new file mode 100644 index 00000000000..6f139419be2 --- /dev/null +++ b/nomad/queues/batch_job_queue_test.go @@ -0,0 +1,133 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" +) + +func TestWaitForPlacement(t *testing.T) { + + t.Run("returns if eval complete", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + testEval.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{testEval}) + + done := <-doneCh + + must.NoError(t, done) + }) + + t.Run("continues watching blocked evals", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + blocked := mock.Eval() + + testEval.Status = structs.EvalStatusComplete + testEval.BlockedEval = blocked.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, blocked}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + // We want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + blocked.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{blocked}) + + done := <-doneCh + must.NoError(t, done) + }) + + t.Run("continues watching next evals after eval failure", func(t *testing.T) { + ss := state.TestStateStore(t) + testQueue := NewDynamicPriorityQueue(ss, nil, &DynamicPriorityConfig{}, hclog.New(hclog.DefaultOptions)) + + testEval := mock.Eval() + next := mock.Eval() + + testEval.Status = structs.EvalStatusFailed + testEval.NextEval = next.ID + + ss.UpsertEvals(structs.MsgTypeTestSetup, 0, []*structs.Evaluation{testEval, next}) + + ws := memdb.NewWatchSet() + doneCh := make(chan error) + go func() { + err := testQueue.waitForPlacement(t.Context(), testEval, ws) + doneCh <- err + }() + + // We want to make sure the testQueue has begun a watch on the blocked eval + // before continuing, which is indicated by the length of the watchset being >0. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if len(ws) == 0 { + return fmt.Errorf("blocking query not started yet") + } + return nil + }), + wait.Timeout(5*time.Second), + wait.Gap(100*time.Millisecond), + )) + + select { + case <-doneCh: + t.Fatal("should not have exited") + default: + } + + next.Status = structs.EvalStatusComplete + ss.UpsertEvals(structs.MsgTypeTestSetup, 1, []*structs.Evaluation{next}) + + done := <-doneCh + must.NoError(t, done) + }) +} diff --git a/nomad/queues/interface.go b/nomad/queues/interface.go new file mode 100644 index 00000000000..0d2d160792e --- /dev/null +++ b/nomad/queues/interface.go @@ -0,0 +1,10 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import "github.com/hashicorp/nomad/nomad/structs" + +type Queue interface { + Enqueue(*structs.Evaluation) +} diff --git a/nomad/queues/priority_queue.go b/nomad/queues/priority_queue.go new file mode 100644 index 00000000000..fee398124af --- /dev/null +++ b/nomad/queues/priority_queue.go @@ -0,0 +1,45 @@ +// Copyright IBM Corp. 2015, 2026 +// SPDX-License-Identifier: BUSL-1.1 + +package queues + +import "container/heap" + +// A WorkloadQueue implements heap.Interface and holds *Workload. +type WorkloadQueue []*Workload + +func (pq WorkloadQueue) Len() int { return len(pq) } + +func (pq WorkloadQueue) Less(i, j int) bool { + // We want Pop to give us the highest, not lowest, priority so we use greater than here. + return pq[i].priority > pq[j].priority +} + +func (pq WorkloadQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *WorkloadQueue) Push(x any) { + n := len(*pq) + item := x.(*Workload) + item.index = n + *pq = append(*pq, item) +} + +func (pq *WorkloadQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // don't stop the GC from reclaiming the item eventually + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *WorkloadQueue) update(item *Workload, priority int) { + item.priority = priority + heap.Fix(pq, item.index) +}