diff --git a/scheduler/reconciler/filters.go b/scheduler/reconciler/filters.go index 5ae45618418..17c135baad5 100644 --- a/scheduler/reconciler/filters.go +++ b/scheduler/reconciler/filters.go @@ -99,6 +99,171 @@ func (set allocSet) filterOldTerminalAllocs(a ReconcilerState) (remain, ignore a return remain, ignored } +// allocCategory represents the classification bucket for an allocation. +type allocCategory string + +const ( + categoryUntainted allocCategory = "untainted" + categoryMigrate allocCategory = "migrate" + categoryLost allocCategory = "lost" + categoryDisconnecting allocCategory = "disconnecting" + categoryReconnecting allocCategory = "reconnecting" + categoryIgnore allocCategory = "ignore" + categoryExpiring allocCategory = "expiring" +) + +// allocContext holds the per-allocation data used by classification rules. +type allocContext struct { + alloc *structs.Allocation + shouldReconnect bool + taintedNode *structs.Node + nodeIsTainted bool + now time.Time +} + +type classificationRule struct { + condition func(allocContext) bool + category allocCategory +} + +// classificationRules are evaluated in order; first match wins. +var classificationRules = []classificationRule{ + // Failed allocs that need reconnect and are still desired to run. + { + condition: func(ctx allocContext) bool { + return ctx.shouldReconnect && + ctx.alloc.DesiredStatus == structs.AllocDesiredStatusRun && + ctx.alloc.ClientStatus == structs.AllocClientStatusFailed + }, + category: categoryReconnecting, + }, + // Server-terminal allocs should be ignored when they are not reconnecting. + { + condition: func(ctx allocContext) bool { + return ctx.alloc.TerminalStatus() && !ctx.shouldReconnect && ctx.alloc.ServerTerminalStatus() + }, + category: categoryIgnore, + }, + // Terminal canaries marked for migration. + { + condition: func(ctx allocContext) bool { + return ctx.alloc.TerminalStatus() && !ctx.shouldReconnect && + ctx.alloc.DeploymentStatus.IsCanary() && ctx.alloc.DesiredTransition.ShouldMigrate() + }, + category: categoryMigrate, + }, + // Terminal allocs that are not reconnecting are untainted. + { + condition: func(ctx allocContext) bool { + return ctx.alloc.TerminalStatus() && !ctx.shouldReconnect + }, + category: categoryUntainted, + }, + // Expired allocs are handled before reconnecting/disconnect paths. + { + condition: func(ctx allocContext) bool { + return ctx.alloc.Expired(ctx.now) + }, + category: categoryExpiring, + }, + // Failed reconnects that server already marked to stop should be ignored. + { + condition: func(ctx allocContext) bool { + return ctx.shouldReconnect && + ctx.alloc.ClientStatus == structs.AllocClientStatusFailed && + ctx.alloc.DesiredStatus == structs.AllocDesiredStatusStop + }, + category: categoryIgnore, + }, + // Disconnected node and alloc already unknown. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode != nil && + ctx.taintedNode.Status == structs.NodeStatusDisconnected && + ctx.alloc.ClientStatus == structs.AllocClientStatusUnknown + }, + category: categoryUntainted, + }, + // Disconnected pending allocs should be replaced immediately. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode != nil && + ctx.taintedNode.Status == structs.NodeStatusDisconnected && + ctx.alloc.ClientStatus == structs.AllocClientStatusPending + }, + category: categoryLost, + }, + // Disconnected allocs with nil disconnect block or lost_after=0 are lost. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode != nil && + ctx.taintedNode.Status == structs.NodeStatusDisconnected && + ctx.alloc.DisconnectTimeout(ctx.now) == ctx.now + }, + category: categoryLost, + }, + // Remaining disconnected allocs are disconnecting. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode != nil && ctx.taintedNode.Status == structs.NodeStatusDisconnected + }, + category: categoryDisconnecting, + }, + // Non-terminal allocs marked for migration always migrate. + { + condition: func(ctx allocContext) bool { + return ctx.alloc.DesiredTransition.ShouldMigrate() + }, + category: categoryMigrate, + }, + // Ready or untainted nodes with reconnecting allocs. + { + condition: func(ctx allocContext) bool { + return (!ctx.nodeIsTainted || (ctx.taintedNode != nil && ctx.taintedNode.Status == structs.NodeStatusReady)) && + ctx.shouldReconnect + }, + category: categoryReconnecting, + }, + // Ready or untainted nodes. + { + condition: func(ctx allocContext) bool { + return !ctx.nodeIsTainted || (ctx.taintedNode != nil && ctx.taintedNode.Status == structs.NodeStatusReady) + }, + category: categoryUntainted, + }, + // GC'd (tainted map entry with nil node) nodes are lost. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode == nil + }, + category: categoryLost, + }, + // Terminal node allocations that cannot be replaced. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode.TerminalStatus() && + !ctx.alloc.ReplaceOnDisconnect() && + ctx.alloc.ClientStatus == structs.AllocClientStatusUnknown + }, + category: categoryUntainted, + }, + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode.TerminalStatus() && + !ctx.alloc.ReplaceOnDisconnect() && + ctx.alloc.ClientStatus == structs.AllocClientStatusRunning + }, + category: categoryDisconnecting, + }, + // Remaining terminal node allocs are lost. + { + condition: func(ctx allocContext) bool { + return ctx.taintedNode.TerminalStatus() + }, + category: categoryLost, + }, +} + // filterByTainted takes a set of tainted nodes and filters the allocation set // into the following groups: // 1. Those that exist on untainted nodes @@ -117,128 +282,50 @@ func (set allocSet) filterByTainted(state ClusterState) (untainted, migrate, los ignore = make(allocSet) expiring = make(allocSet) + // Create a map for quick category assignment. + categoryMap := map[allocCategory]*allocSet{ + categoryUntainted: &untainted, + categoryMigrate: &migrate, + categoryLost: &lost, + categoryDisconnecting: &disconnecting, + categoryReconnecting: &reconnecting, + categoryIgnore: &ignore, + categoryExpiring: &expiring, + } + for _, alloc := range set { - shouldReconnect := false + // Build context for classification rules. + ctx := allocContext{ + alloc: alloc, + now: state.Now, + } - // Only compute reconnect for unknown, running, and failed since they - // need to go through the reconnect logic. + // Compute shouldReconnect for unknown/running/failed allocs. if alloc.ClientStatus == structs.AllocClientStatusUnknown || alloc.ClientStatus == structs.AllocClientStatusRunning || alloc.ClientStatus == structs.AllocClientStatusFailed { - shouldReconnect = alloc.NeedsToReconnect() - } - - // Failed allocs that need to be reconnected must be added to - // reconnecting so that they can be handled as a failed reconnect. - if shouldReconnect && - alloc.DesiredStatus == structs.AllocDesiredStatusRun && - alloc.ClientStatus == structs.AllocClientStatusFailed { - reconnecting[alloc.ID] = alloc - continue - } - - if alloc.TerminalStatus() && !shouldReconnect { - // Server-terminal allocs, if they should not reconnect, - // are probably stopped replacements and should be ignored - if alloc.ServerTerminalStatus() { - ignore[alloc.ID] = alloc - continue - } - - // Terminal canaries that have been marked for migration need to be - // migrated, otherwise we block deployments from progressing by - // counting them as running canaries. - if alloc.DeploymentStatus.IsCanary() && alloc.DesiredTransition.ShouldMigrate() { - migrate[alloc.ID] = alloc - continue - } - - // Terminal allocs, if not reconnect, are always untainted as they - // should never be migrated. - untainted[alloc.ID] = alloc - continue - } - - // The alloc is expired. These end up getting added to "lost" but this - // skips having to evaluate the reconnecting logic for these allocs. - if alloc.Expired(state.Now) { - expiring[alloc.ID] = alloc - continue - } - - // Ignore failed allocs that need to be reconnected and that have been - // marked to stop by the server. - if shouldReconnect && - alloc.ClientStatus == structs.AllocClientStatusFailed && - alloc.DesiredStatus == structs.AllocDesiredStatusStop { - ignore[alloc.ID] = alloc - continue + ctx.shouldReconnect = alloc.NeedsToReconnect() } - taintedNode, nodeIsTainted := state.TaintedNodes[alloc.NodeID] - if taintedNode != nil && taintedNode.Status == structs.NodeStatusDisconnected { - // ignore allocs already marked unknown, they should already have a followup eval - if alloc.ClientStatus == structs.AllocClientStatusUnknown { - untainted[alloc.ID] = alloc - continue - } - // If the alloc is pending mark it lost so it is replaced immediately. - if alloc.ClientStatus == structs.AllocClientStatusPending { - lost[alloc.ID] = alloc - continue + // Get node taint information, preserving whether key existed so we can + // distinguish missing-node from GC'd-node semantics. + ctx.taintedNode, ctx.nodeIsTainted = state.TaintedNodes[alloc.NodeID] + + // Apply classification rules in order (first match wins). + classified := false + for _, rule := range classificationRules { + if rule.condition(ctx) { + targetSet := categoryMap[rule.category] + (*targetSet)[alloc.ID] = alloc + classified = true + break } - // Allocs with nil disconnect blocks or disconnect.lost_after == 0 are lost - if alloc.DisconnectTimeout(state.Now) == state.Now { - lost[alloc.ID] = alloc - continue - } - disconnecting[alloc.ID] = alloc - continue - } - - // Non-terminal allocs that should migrate should always migrate - if alloc.DesiredTransition.ShouldMigrate() { - migrate[alloc.ID] = alloc - continue } - if !nodeIsTainted || (taintedNode != nil && taintedNode.Status == structs.NodeStatusReady) { - // Filter allocs on a node that is now re-connected to be resumed. - if shouldReconnect { - reconnecting[alloc.ID] = alloc - continue - } - - // Otherwise, Node is untainted so alloc is untainted + // Default category if no rule matched. + if !classified { untainted[alloc.ID] = alloc - continue - } - - // Allocs on GC'd (nil) or lost nodes are Lost - if taintedNode == nil { - lost[alloc.ID] = alloc - continue - } - - // Allocs on terminal nodes that can't be rescheduled need to be treated - // differently than those that can. - if taintedNode.TerminalStatus() { - if !alloc.ReplaceOnDisconnect() { - if alloc.ClientStatus == structs.AllocClientStatusUnknown { - untainted[alloc.ID] = alloc - continue - } else if alloc.ClientStatus == structs.AllocClientStatusRunning { - disconnecting[alloc.ID] = alloc - continue - } - } - - lost[alloc.ID] = alloc - continue } - - // All other allocs are untainted - untainted[alloc.ID] = alloc } return diff --git a/scheduler/reconciler/filters_test.go b/scheduler/reconciler/filters_test.go index c54c94b6739..4649ea59195 100644 --- a/scheduler/reconciler/filters_test.go +++ b/scheduler/reconciler/filters_test.go @@ -5,7 +5,9 @@ package reconciler import ( "testing" + "time" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" @@ -72,3 +74,237 @@ func TestReconciler_filterServerTerminalAllocs(t *testing.T) { must.MapLen(t, 6, filtered) }) } + +func TestAllocSet_filterByTainted_ClassificationRules(t *testing.T) { + now := time.Now() + + nodes := map[string]*structs.Node{ + "ready": { + ID: "ready", + Status: structs.NodeStatusReady, + }, + "disconnected": { + ID: "disconnected", + Status: structs.NodeStatusDisconnected, + }, + "down": { + ID: "down", + Status: structs.NodeStatusDown, + }, + "initializing": { + ID: "initializing", + Status: structs.NodeStatusInit, + }, + "gc": nil, + } + + setDisconnect := func(alloc *structs.Allocation, lostAfter time.Duration, replace bool) { + alloc.Job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{ + LostAfter: lostAfter, + Replace: pointer.Of(replace), + } + } + + makeAlloc := func(id, nodeID, clientStatus, desiredStatus string) *structs.Allocation { + alloc := mock.Alloc() + alloc.ID = id + alloc.NodeID = nodeID + alloc.ClientStatus = clientStatus + alloc.DesiredStatus = desiredStatus + alloc.AllocStates = nil + alloc.DeploymentStatus = nil + alloc.DesiredTransition = structs.DesiredTransition{} + setDisconnect(alloc, 5*time.Minute, true) + return alloc + } + + unknownState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now.Add(-10 * time.Second), + }} + + testCases := []struct { + name string + alloc *structs.Allocation + nodeMap map[string]*structs.Node + expected string + }{ + { + name: "failed reconnect run", + alloc: func() *structs.Allocation { + a := makeAlloc("c1", "ready", structs.AllocClientStatusFailed, structs.AllocDesiredStatusRun) + a.AllocStates = unknownState + return a + }(), + nodeMap: nodes, + expected: "reconnecting", + }, + { + name: "terminal server status ignored", + alloc: makeAlloc("c2", "ready", structs.AllocClientStatusRunning, structs.AllocDesiredStatusStop), + nodeMap: nodes, + expected: "ignore", + }, + { + name: "terminal canary migrate", + alloc: func() *structs.Allocation { + a := makeAlloc("c3", "ready", structs.AllocClientStatusComplete, structs.AllocDesiredStatusRun) + a.DeploymentStatus = &structs.AllocDeploymentStatus{Canary: true} + a.DesiredTransition = structs.DesiredTransition{Migrate: pointer.Of(true)} + return a + }(), + nodeMap: nodes, + expected: "migrate", + }, + { + name: "terminal untainted", + alloc: makeAlloc("c4", "ready", structs.AllocClientStatusComplete, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "untainted", + }, + { + name: "expired alloc", + alloc: func() *structs.Allocation { + a := makeAlloc("c5", "ready", structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun) + setDisconnect(a, 1*time.Second, true) + a.AllocStates = unknownState + return a + }(), + nodeMap: nodes, + expected: "expiring", + }, + { + name: "failed reconnect stop ignored", + alloc: func() *structs.Allocation { + a := makeAlloc("c6", "ready", structs.AllocClientStatusFailed, structs.AllocDesiredStatusStop) + a.AllocStates = unknownState + return a + }(), + nodeMap: nodes, + expected: "ignore", + }, + { + name: "disconnected unknown becomes untainted", + alloc: makeAlloc("c7", "disconnected", structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "untainted", + }, + { + name: "disconnected pending lost", + alloc: makeAlloc("c8", "disconnected", structs.AllocClientStatusPending, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "lost", + }, + { + name: "disconnected zero timeout lost", + alloc: func() *structs.Allocation { + a := makeAlloc("c9", "disconnected", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + a.Job = nil + return a + }(), + nodeMap: nodes, + expected: "lost", + }, + { + name: "disconnected grace period", + alloc: makeAlloc("c10", "disconnected", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "disconnecting", + }, + { + name: "migrate flag", + alloc: func() *structs.Allocation { + a := makeAlloc("c11", "ready", structs.AllocClientStatusPending, structs.AllocDesiredStatusRun) + a.DesiredTransition = structs.DesiredTransition{Migrate: pointer.Of(true)} + return a + }(), + nodeMap: nodes, + expected: "migrate", + }, + { + name: "untainted reconnecting via ready node", + alloc: func() *structs.Allocation { + a := makeAlloc("c12", "ready", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + a.AllocStates = unknownState + return a + }(), + nodeMap: nodes, + expected: "reconnecting", + }, + { + name: "untainted on non tainted node", + alloc: makeAlloc("c13", "missing", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "untainted", + }, + { + name: "gc node lost", + alloc: makeAlloc("c14", "gc", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "lost", + }, + { + name: "terminal node unknown no replace", + alloc: func() *structs.Allocation { + a := makeAlloc("c15", "down", structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun) + setDisconnect(a, 5*time.Minute, false) + return a + }(), + nodeMap: nodes, + expected: "untainted", + }, + { + name: "terminal node running no replace", + alloc: func() *structs.Allocation { + a := makeAlloc("c16", "down", structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun) + setDisconnect(a, 5*time.Minute, false) + return a + }(), + nodeMap: nodes, + expected: "disconnecting", + }, + { + name: "terminal node default lost", + alloc: makeAlloc("c17", "down", structs.AllocClientStatusPending, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "lost", + }, + { + name: "other tainted node defaults to untainted", + alloc: makeAlloc("c18", "initializing", structs.AllocClientStatusPending, structs.AllocDesiredStatusRun), + nodeMap: nodes, + expected: "untainted", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + all := allocSet{tc.alloc.ID: tc.alloc} + state := ClusterState{Now: now, TaintedNodes: tc.nodeMap} + + untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring := all.filterByTainted(state) + + buckets := map[string]allocSet{ + "untainted": untainted, + "migrate": migrate, + "lost": lost, + "disconnecting": disconnecting, + "reconnecting": reconnecting, + "ignore": ignore, + "expiring": expiring, + } + + for category, bucket := range buckets { + if category == tc.expected { + must.MapContainsKey(t, bucket, tc.alloc.ID) + must.MapLen(t, 1, bucket) + continue + } + + must.MapNotContainsKey(t, bucket, tc.alloc.ID) + must.MapLen(t, 0, bucket) + } + }) + } +}