diff --git a/pkg/apis/crds/karpenter.sh_nodepools.yaml b/pkg/apis/crds/karpenter.sh_nodepools.yaml index d7ebcc391d..ee9a8c6047 100644 --- a/pkg/apis/crds/karpenter.sh_nodepools.yaml +++ b/pkg/apis/crds/karpenter.sh_nodepools.yaml @@ -135,6 +135,22 @@ spec: memory leak protection, and disruption testing. pattern: ^(([0-9]+(s|m|h))+)|(Never)$ type: string + terminationGracePeriod: + description: |- + TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated. + Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint. + If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, + that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds. + + + Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod. + Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown + + + This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. + It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. + If left undefined, the controller will wait indefinitely for pods to be drained. + type: string type: object x-kubernetes-validations: - message: consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized diff --git a/pkg/apis/v1beta1/labels.go b/pkg/apis/v1beta1/labels.go index bfb6e92e84..702849b2fa 100644 --- a/pkg/apis/v1beta1/labels.go +++ b/pkg/apis/v1beta1/labels.go @@ -43,10 +43,12 @@ const ( // Karpenter specific annotations const ( DoNotDisruptAnnotationKey = Group + "/do-not-disrupt" + DoNotConsolidateAnnotationKey = Group + "/do-not-consolidate" ProviderCompatabilityAnnotationKey = CompatabilityGroup + "/provider" ManagedByAnnotationKey = Group + "/managed-by" NodePoolHashAnnotationKey = Group + "/nodepool-hash" NodePoolHashVersionAnnotationKey = Group + "/nodepool-hash-version" + NodeExpirationTimeAnnotationKey = Group + "/node-expiration-time" ) // Karpenter specific finalizers diff --git a/pkg/apis/v1beta1/nodepool.go b/pkg/apis/v1beta1/nodepool.go index 68fb3ceb79..83a02db6e3 100644 --- a/pkg/apis/v1beta1/nodepool.go +++ b/pkg/apis/v1beta1/nodepool.go @@ -96,6 +96,19 @@ type Disruption struct { // +kubebuilder:validation:MaxItems=50 // +optional Budgets []Budget `json:"budgets,omitempty" hash:"ignore"` + // TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated. + // Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint. + // If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout, + // that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds. + // + // Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod. + // Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown + // + // This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period. + // It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks. + // If left undefined, the controller will wait indefinitely for pods to be drained. + // +optional + TerminationGracePeriod *metav1.Duration `json:"terminationGracePeriod,omitempty"` } // Budget defines when Karpenter will restrict the diff --git a/pkg/apis/v1beta1/taints.go b/pkg/apis/v1beta1/taints.go index 6a9b932dfb..f9a280c368 100644 --- a/pkg/apis/v1beta1/taints.go +++ b/pkg/apis/v1beta1/taints.go @@ -22,6 +22,8 @@ import v1 "k8s.io/api/core/v1" const ( DisruptionTaintKey = Group + "/disruption" DisruptingNoScheduleTaintValue = "disrupting" + + DisruptionNonGracefulShutdownValue = "nodeshutdown" ) var ( @@ -32,6 +34,15 @@ var ( Effect: v1.TaintEffectNoSchedule, Value: DisruptingNoScheduleTaintValue, } + + // DisruptionNonGracefulShutdown is used by the deprovisioning controller to forcefully + // shut down a node. This does not respect graceful termination of any pods on the node. + // https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown + DisruptionNonGracefulShutdown = v1.Taint{ + Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoExecute, + Value: DisruptionNonGracefulShutdownValue, + } ) func IsDisruptingTaint(taint v1.Taint) bool { diff --git a/pkg/apis/v1beta1/zz_generated.deepcopy.go b/pkg/apis/v1beta1/zz_generated.deepcopy.go index bc19166432..b75216945c 100644 --- a/pkg/apis/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/v1beta1/zz_generated.deepcopy.go @@ -69,6 +69,11 @@ func (in *Disruption) DeepCopyInto(out *Disruption) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TerminationGracePeriod != nil { + in, out := &in.TerminationGracePeriod, &out.TerminationGracePeriod + *out = new(metav1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Disruption. diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index ab17e27f52..21842177d4 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -66,7 +66,7 @@ func NewControllers( informer.NewPodController(kubeClient, cluster), informer.NewNodePoolController(kubeClient, cluster), informer.NewNodeClaimController(kubeClient, cluster), - termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue), recorder), + termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder), metricspod.NewController(kubeClient), metricsnodepool.NewController(kubeClient), metricsnode.NewController(cluster), @@ -74,7 +74,7 @@ func NewControllers( nodeclaimconsistency.NewController(clock, kubeClient, recorder), nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder), nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider), - nodeclaimtermination.NewController(kubeClient, cloudProvider), + nodeclaimtermination.NewController(kubeClient, cloudProvider, recorder), nodeclaimdisruption.NewController(clock, kubeClient, cluster, cloudProvider), leasegarbagecollection.NewController(kubeClient), } diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index b3c33a6c7d..c30355c59b 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -20,6 +20,8 @@ import ( "bytes" "context" "fmt" + "os" + "strconv" "sync" "time" @@ -130,17 +132,21 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc // Attempt different disruption methods. We'll only let one method perform an action for _, m := range c.methods { c.recordRun(fmt.Sprintf("%T", m)) - success, err := c.disrupt(ctx, m) + _, err := c.disrupt(ctx, m) if err != nil { return reconcile.Result{}, fmt.Errorf("disrupting via %q, %w", m.Type(), err) } - if success { - return reconcile.Result{RequeueAfter: controller.Immediately}, nil - } + // ensure we attempt all disruption methods before completing reconciliation + // if success { + // return reconcile.Result{RequeueAfter: controller.Immediately}, nil + // } } // All methods did nothing, so return nothing to do - return reconcile.Result{RequeueAfter: pollingPeriod}, nil + // return reconcile.Result{RequeueAfter: pollingPeriod}, nil + + // loop complete, requeue immediately + return reconcile.Result{RequeueAfter: controller.Immediately}, nil } func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) { @@ -194,6 +200,31 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) } + sleepTime := os.Getenv("DISRUPTION_ANNOTATION_SLEEP_SECONDS") + if len(sleepTime) > 0 { + sleepSeconds, err := strconv.Atoi(sleepTime) + if err == nil { + logging.FromContext(ctx).With("command-id", commandID).With("sleep", sleepTime).Infof("waiting for do-not-disrupt or do-not-consolidate pods that may have scheduled...") + time.Sleep(time.Duration(sleepSeconds * int(time.Second))) + } else { + logging.FromContext(ctx).With("command-id", commandID).With("sleep", sleepTime).With("variable", "DISRUPTION_ANNOTATION_SLEEP_SECONDS").Errorf("parsing disruption sleep time") + } + } + + // verify that the nodes we intend to disrupt do not have any do-not-disrupt pods, remove the DoNotSchedule disruption taint if they do + nodesToNotDisrupt, er := state.ValidateNoScheduleTaint(ctx, c.kubeClient, m.Type(), stateNodes...) + if er != nil { + return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, er), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + } + + // remove any nodes that had do-not-disrupt pods from the list of nodes we intend to disrupt + for _, n := range nodesToNotDisrupt { + logging.FromContext(ctx).With("command-id", commandID).Infof("avoiding disruption of node %s due to a do-not-disrupt or do-not-consolidate annotation race condition", n.Node.Name) + } + cmd.candidates = lo.Reject(cmd.candidates, func(c *Candidate, _ int) bool { + return lo.Contains(nodesToNotDisrupt, c.StateNode) + }) + var nodeClaimNames []string var err error if len(cmd.replacements) > 0 { diff --git a/pkg/controllers/disruption/types.go b/pkg/controllers/disruption/types.go index bf3d8be308..1998850d55 100644 --- a/pkg/controllers/disruption/types.go +++ b/pkg/controllers/disruption/types.go @@ -127,10 +127,11 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events return nil, fmt.Errorf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(po)) } } - if pdbKey, ok := pdbs.CanEvictPods(pods); !ok { - recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("PDB %q prevents pod evictions", pdbKey))...) - return nil, fmt.Errorf("pdb %q prevents pod evictions", pdbKey) - } + // if pdbKey, ok := pdbs.CanEvictPods(pods); !ok { + // logging.FromContext(ctx).Infof("ignoring non-evictable pdb: %q", pdbKey) + // // recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("PDB %q prevents pod evictions", pdbKey))...) + // // return nil, fmt.Errorf("pdb %q prevents pod evictions", pdbKey) + // } return &Candidate{ StateNode: node.DeepCopy(), instanceType: instanceType, diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index f92a9ed2af..fd65dc5b3f 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -81,10 +81,20 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res if err := c.deleteAllNodeClaims(ctx, node); err != nil { return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) } - if err := c.terminator.Taint(ctx, node); err != nil { - return reconcile.Result{}, fmt.Errorf("tainting node, %w", err) + + nodeGracePeriodExpirationTime, err := c.nodeExpirationTime(node) + if err != nil { + return reconcile.Result{}, err + } + if nodeGracePeriodExpirationTime != nil && time.Now().After(*nodeGracePeriodExpirationTime) { + if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNonGracefulShutdown); err != nil { + return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1.TaintNodeOutOfService, err) + } + } + if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNoScheduleTaint); err != nil { + return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1beta1.DisruptionTaintKey, err) } - if err := c.terminator.Drain(ctx, node); err != nil { + if err := c.terminator.Drain(ctx, node, nodeGracePeriodExpirationTime); err != nil { if !terminator.IsNodeDrainError(err) { return reconcile.Result{}, fmt.Errorf("draining node, %w", err) } @@ -102,6 +112,7 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err) } } + return reconcile.Result{RequeueAfter: 1 * time.Second}, nil } // Be careful when removing this delete call in the Node termination flow @@ -157,6 +168,18 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error { return nil } +func (c *Controller) nodeExpirationTime(node *v1.Node) (*time.Time, error) { + if expirationTimeString, exists := node.ObjectMeta.Annotations[v1beta1.NodeExpirationTimeAnnotationKey]; exists { + expirationTime, err := time.Parse(time.RFC3339, expirationTimeString) + if err != nil { + return nil, fmt.Errorf("parsing %s annotation, %w", v1beta1.NodeExpirationTimeAnnotationKey, err) + } + return &expirationTime, nil + } + + return nil, nil +} + func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontroller.Builder { return operatorcontroller.Adapt(controllerruntime. NewControllerManagedBy(m). diff --git a/pkg/controllers/node/termination/suite_test.go b/pkg/controllers/node/termination/suite_test.go index 7813adfeb8..e58c0fb841 100644 --- a/pkg/controllers/node/termination/suite_test.go +++ b/pkg/controllers/node/termination/suite_test.go @@ -44,7 +44,9 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/pkg/ptr" . "knative.dev/pkg/logging/testing" @@ -74,7 +76,7 @@ var _ = BeforeSuite(func() { cloudProvider = fake.NewCloudProvider() recorder = test.NewEventRecorder() queue = terminator.NewQueue(env.Client, recorder) - terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue), recorder) + terminationController = termination.NewController(env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder) }) var _ = AfterSuite(func() { @@ -84,9 +86,17 @@ var _ = AfterSuite(func() { var _ = Describe("Termination", func() { var node *v1.Node var nodeClaim *v1beta1.NodeClaim + var nodePool *v1beta1.NodePool BeforeEach(func() { + nodePool = test.NodePool() nodeClaim, node = test.NodeClaimAndNode(v1beta1.NodeClaim{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{v1beta1.TerminationFinalizer}}}) + nodeClaim.ObjectMeta.OwnerReferences = []metav1.OwnerReference{{ + APIVersion: v1beta1.Group + "/v1beta1", + Kind: "NodePool", + Name: nodePool.Name, + UID: uuid.NewUUID(), + }} node.Labels[v1beta1.NodePoolLabelKey] = test.NodePool().Name cloudProvider.CreatedNodeClaims[node.Spec.ProviderID] = nodeClaim }) @@ -105,14 +115,14 @@ var _ = Describe("Termination", func() { Context("Reconciliation", func() { It("should delete nodes", func() { - ExpectApplied(ctx, env.Client, node) + ExpectApplied(ctx, env.Client, node, nodeClaim) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) ExpectNotFound(ctx, env.Client, node) }) It("should delete nodeclaims associated with nodes", func() { - ExpectApplied(ctx, env.Client, node, nodeClaim) + ExpectApplied(ctx, env.Client, node, nodeClaim, nodeClaim) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) @@ -128,7 +138,7 @@ var _ = Describe("Termination", func() { Finalizers: []string{v1beta1.TerminationFinalizer}, }, }) - ExpectApplied(ctx, env.Client, node) + ExpectApplied(ctx, env.Client, node, nodeClaim) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) nodes = append(nodes, node) @@ -158,7 +168,7 @@ var _ = Describe("Termination", func() { }, }) - ExpectApplied(ctx, env.Client, node, podNoEvict) + ExpectApplied(ctx, env.Client, node, nodeClaim, podNoEvict) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) @@ -173,7 +183,7 @@ var _ = Describe("Termination", func() { Tolerations: []v1.Toleration{{Key: v1beta1.DisruptionTaintKey, Operator: v1.TolerationOpEqual, Effect: v1beta1.DisruptionNoScheduleTaint.Effect, Value: v1beta1.DisruptionNoScheduleTaint.Value}}, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, }) - ExpectApplied(ctx, env.Client, node, podEvict, podSkip) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict, podSkip) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -201,7 +211,7 @@ var _ = Describe("Termination", func() { Tolerations: []v1.Toleration{{Key: v1beta1.DisruptionTaintKey, Operator: v1.TolerationOpExists, Effect: v1beta1.DisruptionNoScheduleTaint.Effect}}, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, }) - ExpectApplied(ctx, env.Client, node, podEvict, podSkip) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict, podSkip) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -230,7 +240,7 @@ var _ = Describe("Termination", func() { Tolerations: []v1.Toleration{{Key: v1.TaintNodeUnschedulable, Operator: v1.TolerationOpExists, Effect: v1.TaintEffectNoSchedule}}, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, }) - ExpectApplied(ctx, env.Client, node, podEvict) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -258,7 +268,7 @@ var _ = Describe("Termination", func() { }, }) - ExpectApplied(ctx, env.Client, node, pod) + ExpectApplied(ctx, env.Client, node, nodeClaim, pod) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) @@ -290,7 +300,7 @@ var _ = Describe("Termination", func() { Phase: v1.PodFailed, }) - ExpectApplied(ctx, env.Client, node, podEvictPhaseSucceeded, podEvictPhaseFailed) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvictPhaseSucceeded, podEvictPhaseFailed) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) // Trigger Termination Controller, which should ignore these pods and delete the node @@ -314,7 +324,7 @@ var _ = Describe("Termination", func() { Phase: v1.PodRunning, }) - ExpectApplied(ctx, env.Client, node, podNoEvict, pdb) + ExpectApplied(ctx, env.Client, node, nodeClaim, podNoEvict, pdb) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -373,7 +383,7 @@ var _ = Describe("Termination", func() { BlockOwnerDeletion: ptr.Bool(true), }}}}) - ExpectApplied(ctx, env.Client, node, podEvict, podNodeCritical, podClusterCritical, podDaemonEvict, podDaemonNodeCritical, podDaemonClusterCritical) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict, podNodeCritical, podClusterCritical, podDaemonEvict, podDaemonNodeCritical, podDaemonClusterCritical) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -430,7 +440,7 @@ var _ = Describe("Termination", func() { podNodeCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-node-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) podClusterCritical := test.Pod(test.PodOptions{NodeName: node.Name, PriorityClassName: "system-cluster-critical", ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - ExpectApplied(ctx, env.Client, node, podEvict, podNodeCritical, podClusterCritical) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict, podNodeCritical, podClusterCritical) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -461,9 +471,8 @@ var _ = Describe("Termination", func() { ExpectNotFound(ctx, env.Client, node) }) It("should not evict static pods", func() { - ExpectApplied(ctx, env.Client, node) podEvict := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - ExpectApplied(ctx, env.Client, node, podEvict) + ExpectApplied(ctx, env.Client, node, nodeClaim, podEvict) podNoEvict := test.Pod(test.PodOptions{ NodeName: node.Name, @@ -507,7 +516,7 @@ var _ = Describe("Termination", func() { }) It("should not delete nodes until all pods are deleted", func() { pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - ExpectApplied(ctx, env.Client, node, pods[0], pods[1]) + ExpectApplied(ctx, env.Client, node, nodeClaim, pods[0], pods[1]) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -540,7 +549,7 @@ var _ = Describe("Termination", func() { }) It("should delete nodes with no underlying instance even if not fully drained", func() { pods := test.Pods(2, test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) - ExpectApplied(ctx, env.Client, node, pods[0], pods[1]) + ExpectApplied(ctx, env.Client, node, nodeClaim, pods[0], pods[1]) // Make Node NotReady since it's automatically marked as Ready on first deploy ExpectMakeNodesNotReady(ctx, env.Client, node) @@ -605,7 +614,7 @@ var _ = Describe("Termination", func() { It("should wait for pods to terminate", func() { pod := test.Pod(test.PodOptions{NodeName: node.Name, ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}}) fakeClock.SetTime(time.Now()) // make our fake clock match the pod creation time - ExpectApplied(ctx, env.Client, node, pod) + ExpectApplied(ctx, env.Client, node, nodeClaim, pod) // Before grace period, node should not delete Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -629,7 +638,7 @@ var _ = Describe("Termination", func() { OwnerReferences: defaultOwnerRefs, }, }) - ExpectApplied(ctx, env.Client, node, pod) + ExpectApplied(ctx, env.Client, node, nodeClaim, pod) // Trigger Termination Controller Expect(env.Client.Delete(ctx, node)).To(Succeed()) @@ -674,10 +683,141 @@ var _ = Describe("Termination", func() { g.Expect(pod.DeletionTimestamp.IsZero()).To(BeTrue()) }, ReconcilerPropagationTime, RequestInterval).Should(Succeed()) }) + It("should not taint a node that has no expiration annotations", func() { + minAvailable := intstr.FromInt(1) + labelSelector := map[string]string{test.RandomName(): test.RandomName()} + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: labelSelector, + // Don't let any pod evict + MinAvailable: &minAvailable, + }) + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Labels: labelSelector, + OwnerReferences: defaultOwnerRefs, + }, + Phase: v1.PodRunning, + }) + + node.ObjectMeta.Annotations = map[string]string{} + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, podNoEvict, pdb) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + ExpectNodeExists(ctx, env.Client, node.Name) + + Expect(env.Client.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed()) + Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNonGracefulShutdown)) + }) + It("should not taint a node that has not yet exceeded it's terminationGracePeriod", func() { + minAvailable := intstr.FromInt(1) + labelSelector := map[string]string{test.RandomName(): test.RandomName()} + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: labelSelector, + // Don't let any pod evict + MinAvailable: &minAvailable, + }) + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Labels: labelSelector, + OwnerReferences: defaultOwnerRefs, + }, + Phase: v1.PodRunning, + }) + + node.ObjectMeta.Annotations = map[string]string{ + v1beta1.NodeExpirationTimeAnnotationKey: time.Now().Add(time.Minute * 5).Format(time.RFC3339), + } + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, podNoEvict, pdb) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + ExpectNodeExists(ctx, env.Client, node.Name) + ExpectPodExists(ctx, env.Client, podNoEvict.Name, podNoEvict.Namespace) + Expect(node.Spec.Taints).ToNot(ContainElement(v1beta1.DisruptionNonGracefulShutdown)) + }) + It("should taint a node that has exceeded it's terminationGracePeriod", func() { + minAvailable := intstr.FromInt(1) + labelSelector := map[string]string{test.RandomName(): test.RandomName()} + pdb := test.PodDisruptionBudget(test.PDBOptions{ + Labels: labelSelector, + // Don't let any pod evict + MinAvailable: &minAvailable, + }) + podNoEvict := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{ + Labels: labelSelector, + OwnerReferences: defaultOwnerRefs, + }, + Phase: v1.PodRunning, + }) + + node.ObjectMeta.Annotations = map[string]string{ + v1beta1.NodeExpirationTimeAnnotationKey: time.Now().Add(time.Second * -10).Format(time.RFC3339), + } + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Minute * 1} + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, podNoEvict, pdb) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + ExpectNodeExists(ctx, env.Client, node.Name) + + Expect(env.Client.Get(ctx, types.NamespacedName{Name: node.Name}, node)).To(Succeed()) + Expect(node.Spec.Taints).To(ContainElement(v1beta1.DisruptionNonGracefulShutdown)) + }) + It("should preemptively delete pods to satisfy their terminationGracePeriodSeconds", func() { + pod := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, + TerminationGracePeriodSeconds: lo.ToPtr(int64(30)), + }) + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + fakeClock.SetTime(time.Now()) + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, pod) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + fakeClock.SetTime(time.Now().Add(90 * time.Second)) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + ExpectNodeExists(ctx, env.Client, node.Name) + ExpectDeleted(ctx, env.Client, pod) + + // After grace period, node should delete. The deletion timestamps are from etcd which we can't control, so + // to eliminate test-flakiness we reset the time to current time + 120 seconds instead of just advancing + // the clock by 120 seconds. + fakeClock.SetTime(time.Now().Add(120 * time.Second)) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectNotFound(ctx, env.Client, node) + }) + It("should not delete pods if their terminationGracePeriodSeconds will not expire before the node's terminationGracePeriod", func() { + pod := test.Pod(test.PodOptions{ + NodeName: node.Name, + ObjectMeta: metav1.ObjectMeta{OwnerReferences: defaultOwnerRefs}, + TerminationGracePeriodSeconds: lo.ToPtr(int64(60)), + }) + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + fakeClock.SetTime(time.Now()) + ExpectApplied(ctx, env.Client, node, nodeClaim, nodePool, pod) + + Expect(env.Client.Delete(ctx, node)).To(Succeed()) + fakeClock.SetTime(time.Now().Add(90 * time.Second)) + ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) + ExpectReconcileSucceeded(ctx, queue, client.ObjectKey{}) + ExpectNodeExists(ctx, env.Client, node.Name) + ExpectPodExists(ctx, env.Client, pod.Name, pod.Namespace) + }) }) Context("Metrics", func() { It("should fire the terminationSummary metric when deleting nodes", func() { - ExpectApplied(ctx, env.Client, node) + ExpectApplied(ctx, env.Client, node, nodeClaim) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) @@ -687,7 +827,7 @@ var _ = Describe("Termination", func() { Expect(m.GetSummary().GetSampleCount()).To(BeNumerically("==", 1)) }) It("should fire the nodesTerminated counter metric when deleting nodes", func() { - ExpectApplied(ctx, env.Client, node) + ExpectApplied(ctx, env.Client, node, nodeClaim) Expect(env.Client.Delete(ctx, node)).To(Succeed()) node = ExpectNodeExists(ctx, env.Client, node.Name) ExpectReconcileSucceeded(ctx, terminationController, client.ObjectKeyFromObject(node)) diff --git a/pkg/controllers/node/termination/terminator/events/events.go b/pkg/controllers/node/termination/terminator/events/events.go index 84c4b07bc0..6aa4385e89 100644 --- a/pkg/controllers/node/termination/terminator/events/events.go +++ b/pkg/controllers/node/termination/terminator/events/events.go @@ -34,6 +34,16 @@ func EvictPod(pod *v1.Pod) events.Event { } } +func DeletePod(pod *v1.Pod) events.Event { + return events.Event{ + InvolvedObject: pod, + Type: v1.EventTypeNormal, + Reason: "Deleted", + Message: fmt.Sprintf("Deleted pod regardless of PDBs and lifecycle hooks, %v seconds before node termination to accommodate its terminationGracePeriodSeconds", pod.Spec.TerminationGracePeriodSeconds), + DedupeValues: []string{pod.Name}, + } +} + func NodeFailedToDrain(node *v1.Node, err error) events.Event { return events.Event{ InvolvedObject: node, @@ -43,3 +53,13 @@ func NodeFailedToDrain(node *v1.Node, err error) events.Event { DedupeValues: []string{node.Name}, } } + +func NodeTerminationGracePeriod(node *v1.Node, expirationTime string) events.Event { + return events.Event{ + InvolvedObject: node, + Type: v1.EventTypeWarning, + Reason: "TerminationGracePeriodExpiration", + Message: fmt.Sprintf("Node will have the out-of-service taint applied at: %s", expirationTime), + DedupeValues: []string{node.Name}, + } +} diff --git a/pkg/controllers/node/termination/terminator/eviction.go b/pkg/controllers/node/termination/terminator/eviction.go index 2a194d1e94..54ddbf9e37 100644 --- a/pkg/controllers/node/termination/terminator/eviction.go +++ b/pkg/controllers/node/termination/terminator/eviction.go @@ -137,9 +137,11 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R if shutdown { return reconcile.Result{}, fmt.Errorf("EvictionQueue is broken and has shutdown") } + qk := item.(QueueKey) defer q.RateLimitingInterface.Done(qk) - // Evict pod + + // Evict the pod if q.Evict(ctx, qk) { q.RateLimitingInterface.Forget(qk) q.mu.Lock() @@ -147,12 +149,13 @@ func (q *Queue) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.R q.mu.Unlock() return reconcile.Result{RequeueAfter: controller.Immediately}, nil } + // Requeue pod if eviction failed q.RateLimitingInterface.AddRateLimited(qk) return reconcile.Result{RequeueAfter: controller.Immediately}, nil } -// Evict returns true if successful eviction call, and false if not an eviction-related error +// Evict returns true if successful eviction call, and false if there was an eviction-related error func (q *Queue) Evict(ctx context.Context, key QueueKey) bool { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("pod", key.NamespacedName)) if err := q.kubeClient.SubResource("eviction").Create(ctx, diff --git a/pkg/controllers/node/termination/terminator/suite_test.go b/pkg/controllers/node/termination/terminator/suite_test.go index bd756ac601..a55d3a2021 100644 --- a/pkg/controllers/node/termination/terminator/suite_test.go +++ b/pkg/controllers/node/termination/terminator/suite_test.go @@ -20,6 +20,7 @@ import ( "context" "sync" "testing" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -29,7 +30,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" + clock "k8s.io/utils/clock/testing" . "knative.dev/pkg/logging/testing" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" @@ -47,6 +50,8 @@ var recorder *test.EventRecorder var queue *terminator.Queue var pdb *policyv1.PodDisruptionBudget var pod *v1.Pod +var fakeClock *clock.FakeClock +var terminatorInstance *terminator.Terminator func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -55,10 +60,12 @@ func TestAPIs(t *testing.T) { } var _ = BeforeSuite(func() { + fakeClock = clock.NewFakeClock(time.Now()) env = test.NewEnvironment(scheme.Scheme, test.WithCRDs(apis.CRDs...)) ctx = options.ToContext(ctx, test.Options(test.OptionsFields{FeatureGates: test.FeatureGates{Drift: lo.ToPtr(true)}})) recorder = test.NewEventRecorder() queue = terminator.NewQueue(env.Client, recorder) + terminatorInstance = terminator.NewTerminator(fakeClock, env.Client, queue, recorder) }) var _ = AfterSuite(func() { @@ -154,4 +161,32 @@ var _ = Describe("Eviction/Queue", func() { } }) }) + + Context("Pod Deletion API", func() { + It("should not delete a pod with no nodeExpirationTime", func() { + ExpectApplied(ctx, env.Client, pod) + + Expect(terminatorInstance.DeleteExpiringPods(ctx, []*v1.Pod{pod}, nil)).To(Succeed()) + ExpectExists(ctx, env.Client, pod) + Expect(recorder.Calls("Deleted")).To(Equal(0)) + }) + It("should not delete a pod with terminationGracePeriodSeconds still remaining before nodeExpirationTime", func() { + pod.Spec.TerminationGracePeriodSeconds = lo.ToPtr[int64](60) + ExpectApplied(ctx, env.Client, pod) + + nodeExpirationTime := time.Now().Add(time.Minute * 5) + Expect(terminatorInstance.DeleteExpiringPods(ctx, []*v1.Pod{pod}, &nodeExpirationTime)).To(Succeed()) + ExpectExists(ctx, env.Client, pod) + Expect(recorder.Calls("Deleted")).To(Equal(0)) + }) + It("should delete a pod with less than terminationGracePeriodSeconds remaining before nodeExpirationTime", func() { + pod.Spec.TerminationGracePeriodSeconds = lo.ToPtr[int64](120) + ExpectApplied(ctx, env.Client, pod) + + nodeExpirationTime := time.Now().Add(time.Minute * 1) + Expect(terminatorInstance.DeleteExpiringPods(ctx, []*v1.Pod{pod}, &nodeExpirationTime)).To(Succeed()) + ExpectNotFound(ctx, env.Client, pod) + Expect(recorder.Calls("Deleted")).To(Equal(1)) + }) + }) }) diff --git a/pkg/controllers/node/termination/terminator/terminator.go b/pkg/controllers/node/termination/terminator/terminator.go index c2ed1ec10d..f5605bbf04 100644 --- a/pkg/controllers/node/termination/terminator/terminator.go +++ b/pkg/controllers/node/termination/terminator/terminator.go @@ -19,17 +19,20 @@ package terminator import ( "context" "fmt" + "time" "github.com/samber/lo" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/clock" "knative.dev/pkg/logging" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/events" nodeutil "sigs.k8s.io/karpenter/pkg/utils/node" - "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" podutil "sigs.k8s.io/karpenter/pkg/utils/pod" ) @@ -37,28 +40,30 @@ type Terminator struct { clock clock.Clock kubeClient client.Client evictionQueue *Queue + recorder events.Recorder } -func NewTerminator(clk clock.Clock, kubeClient client.Client, eq *Queue) *Terminator { +func NewTerminator(clk clock.Clock, kubeClient client.Client, eq *Queue, recorder events.Recorder) *Terminator { return &Terminator{ clock: clk, kubeClient: kubeClient, evictionQueue: eq, + recorder: recorder, } } -// Taint idempotently adds the karpenter.sh/disruption taint to a node with a NodeClaim -func (t *Terminator) Taint(ctx context.Context, node *v1.Node) error { +// Taint idempotently adds an arbitrary taint to a node with a NodeClaim +func (t *Terminator) Taint(ctx context.Context, node *v1.Node, taint v1.Taint) error { stored := node.DeepCopy() - // If the taint already has the karpenter.sh/disruption=disrupting:NoSchedule taint, do nothing. + // If the node already has the correct taint (key, value, and effect), do nothing. if _, ok := lo.Find(node.Spec.Taints, func(t v1.Taint) bool { - return v1beta1.IsDisruptingTaint(t) + return t.MatchTaint(&taint) && t.Value == taint.Value && t.Effect == taint.Effect }); !ok { - // If the taint key exists (but with a different value or effect), remove it. + // Otherwise, if the taint key exists (but with a different value or effect), remove it. node.Spec.Taints = lo.Reject(node.Spec.Taints, func(t v1.Taint, _ int) bool { - return t.Key == v1beta1.DisruptionTaintKey + return t.Key == taint.Key }) - node.Spec.Taints = append(node.Spec.Taints, v1beta1.DisruptionNoScheduleTaint) + node.Spec.Taints = append(node.Spec.Taints, taint) } // Adding this label to the node ensures that the node is removed from the load-balancer target group // while it is draining and before it is terminated. This prevents 500s coming prior to health check @@ -72,23 +77,29 @@ func (t *Terminator) Taint(ctx context.Context, node *v1.Node) error { if err := t.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil { return err } - logging.FromContext(ctx).Infof("tainted node") + logging.FromContext(ctx).With("taint.Key", taint.Key).With("taint.Effect", taint.Effect).With("taint.Value", taint.Value).Infof("tainted node") } return nil } // Drain evicts pods from the node and returns true when all pods are evicted // https://kubernetes.io/docs/concepts/architecture/nodes/#graceful-node-shutdown -func (t *Terminator) Drain(ctx context.Context, node *v1.Node) error { +func (t *Terminator) Drain(ctx context.Context, node *v1.Node, nodeGracePeriodExpirationTime *time.Time) error { pods, err := nodeutil.GetPods(ctx, t.kubeClient, node) if err != nil { return fmt.Errorf("listing pods on node, %w", err) } + // evictablePods are pods that aren't yet terminating are eligible to have the eviction API called against them evictablePods := lo.Filter(pods, func(p *v1.Pod, _ int) bool { return podutil.IsEvictable(p) }) + + if err := t.DeleteExpiringPods(ctx, evictablePods, nodeGracePeriodExpirationTime); err != nil { + return err + } + t.Evict(evictablePods) - // podsWaitingEvictionCount are the number of pods that either haven't had eviction called against them yet + // podsWaitingEvictionCount is the number of pods that either haven't had eviction called against them yet // or are still actively terminated and haven't exceeded their termination grace period yet podsWaitingEvictionCount := lo.CountBy(pods, func(p *v1.Pod) bool { return podutil.IsWaitingEviction(p, t.clock) }) if podsWaitingEvictionCount > 0 { @@ -115,18 +126,55 @@ func (t *Terminator) Evict(pods []*v1.Pod) { } } } - // 2. Evict in order: - // a. non-critical non-daemonsets - // b. non-critical daemonsets - // c. critical non-daemonsets - // d. critical daemonsets - if len(nonCriticalNonDaemon) != 0 { - t.evictionQueue.Add(nonCriticalNonDaemon...) - } else if len(nonCriticalDaemon) != 0 { - t.evictionQueue.Add(nonCriticalDaemon...) - } else if len(criticalNonDaemon) != 0 { - t.evictionQueue.Add(criticalNonDaemon...) - } else if len(criticalDaemon) != 0 { - t.evictionQueue.Add(criticalDaemon...) + + // EvictInOrder evicts only the first list of pods which is not empty + // future Evict calls will catch later lists of pods that were not initially evicted + t.EvictInOrder( + nonCriticalNonDaemon, + nonCriticalDaemon, + criticalNonDaemon, + criticalDaemon, + ) +} + +func (t *Terminator) EvictInOrder(pods ...[]*v1.Pod) { + for _, podList := range pods { + if len(podList) > 0 { + // evict the first list of pods that is not empty, ignore the rest + t.evictionQueue.Add(podList...) + return + } + } +} + +func (t *Terminator) DeleteExpiringPods(ctx context.Context, pods []*v1.Pod, nodeGracePeriodExpirationTime *time.Time) error { + for _, pod := range pods { + // check if the node has an expiration time and the pod needs to be deleted + deleteTime := t.podDeleteTimeWithGracePeriod(nodeGracePeriodExpirationTime, pod) + if deleteTime != nil && time.Now().After(*deleteTime) { + t.recorder.Publish(terminatorevents.DeletePod(pod)) + + // delete pod proactively to give as much of its terminationGracePeriodSeconds as possible for deletion + if err := t.kubeClient.Delete(ctx, pod); err != nil { + if !apierrors.IsNotFound(err) { // ignore 404, not a problem + logging.FromContext(ctx).With("namespace", pod.Namespace).With("name", pod.Name).Errorf("deleting pod, %s", err) + } + return err + } + logging.FromContext(ctx).With("namespace", pod.Namespace).With("name", pod.Name).With("pod.terminationGracePeriodSeconds", *pod.Spec.TerminationGracePeriodSeconds).With("nodeclaim.expirationTime", nodeGracePeriodExpirationTime).Infof("deleted pod") + } + } + return nil +} + +// if a pod should be deleted to give it the full terminationGracePeriodSeconds of time before the node will shut down, return the time the pod should be deleted +func (t *Terminator) podDeleteTimeWithGracePeriod(nodeGracePeriodExpirationTime *time.Time, pod *v1.Pod) *time.Time { + if nodeGracePeriodExpirationTime == nil || pod.Spec.TerminationGracePeriodSeconds == nil { // k8s defaults to 30s, so we should never see a nil TerminationGracePeriodSeconds + return nil } + + // calculate the time the pod should be deleted to allow it's full grace period for termination, equal to its terminationGracePeriodSeconds before the node's expiration time + // eg: if a node will be force terminated in 30m, but the current pod has a grace period of 45m, we return a time of 15m ago + deleteTime := nodeGracePeriodExpirationTime.Add(time.Duration(*pod.Spec.TerminationGracePeriodSeconds) * time.Second * -1) + return &deleteTime } diff --git a/pkg/controllers/nodeclaim/termination/controller.go b/pkg/controllers/nodeclaim/termination/controller.go index ae1cf978be..4f10d649e2 100644 --- a/pkg/controllers/nodeclaim/termination/controller.go +++ b/pkg/controllers/nodeclaim/termination/controller.go @@ -39,6 +39,8 @@ import ( "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/cloudprovider" + terminatorevents "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator/events" + "sigs.k8s.io/karpenter/pkg/events" operatorcontroller "sigs.k8s.io/karpenter/pkg/operator/controller" nodeclaimutil "sigs.k8s.io/karpenter/pkg/utils/nodeclaim" ) @@ -50,13 +52,15 @@ var _ operatorcontroller.FinalizingTypedController[*v1beta1.NodeClaim] = (*Contr type Controller struct { kubeClient client.Client cloudProvider cloudprovider.CloudProvider + recorder events.Recorder } // NewController is a constructor for the NodeClaim Controller -func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) operatorcontroller.Controller { +func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, recorder events.Recorder) operatorcontroller.Controller { return operatorcontroller.Typed[*v1beta1.NodeClaim](kubeClient, &Controller{ kubeClient: kubeClient, cloudProvider: cloudProvider, + recorder: recorder, }) } @@ -68,6 +72,7 @@ func (c *Controller) Reconcile(_ context.Context, _ *v1beta1.NodeClaim) (reconci func (c *Controller) Finalize(ctx context.Context, nodeClaim *v1beta1.NodeClaim) (reconcile.Result, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("node", nodeClaim.Status.NodeName, "provider-id", nodeClaim.Status.ProviderID)) stored := nodeClaim.DeepCopy() + if !controllerutil.ContainsFinalizer(nodeClaim, v1beta1.TerminationFinalizer) { return reconcile.Result{}, nil } @@ -76,6 +81,11 @@ func (c *Controller) Finalize(ctx context.Context, nodeClaim *v1beta1.NodeClaim) return reconcile.Result{}, err } for _, node := range nodes { + err = c.ensureTerminationGracePeriodExpirationAnnotation(ctx, node, nodeClaim) + if err != nil { + return reconcile.Result{}, err + } + // If we still get the Node, but it's already marked as terminating, we don't need to call Delete again if node.DeletionTimestamp.IsZero() { // We delete nodes to trigger the node finalization and deletion flow @@ -115,6 +125,42 @@ func (c *Controller) Finalize(ctx context.Context, nodeClaim *v1beta1.NodeClaim) return reconcile.Result{}, nil } +func (c *Controller) ensureTerminationGracePeriodExpirationAnnotation(ctx context.Context, node *v1.Node, nodeClaim *v1beta1.NodeClaim) error { + + // if the expiration annotation is already set, we don't need to do anything + if _, exists := node.ObjectMeta.Annotations[v1beta1.NodeExpirationTimeAnnotationKey]; exists { + return nil + } + + nodePool, err := nodeclaimutil.NodePoolForNodeClaim(ctx, c.kubeClient, nodeClaim) + if err != nil { + return fmt.Errorf("ensuring node expiration annotation, %w", err) + } + + if nodePool != nil && nodePool.Spec.Disruption.TerminationGracePeriod != nil && nodeClaim.ObjectMeta.DeletionTimestamp != nil { + expirationTimeString := nodeClaim.DeletionTimestamp.Time.Add(nodePool.Spec.Disruption.TerminationGracePeriod.Duration).Format(time.RFC3339) + return c.annotateTerminationGracePeriodExpirationTime(ctx, node, expirationTimeString) + } + + return nil +} + +func (c *Controller) annotateTerminationGracePeriodExpirationTime(ctx context.Context, node *v1.Node, expirationTime string) error { + stored := node.DeepCopy() + if node.ObjectMeta.Annotations == nil { + node.ObjectMeta.Annotations = map[string]string{} + } + node.ObjectMeta.Annotations[v1beta1.NodeExpirationTimeAnnotationKey] = expirationTime + + if err := c.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + return client.IgnoreNotFound(fmt.Errorf("adding %s annotation, %w", v1beta1.NodeExpirationTimeAnnotationKey, err)) + } + logging.FromContext(ctx).With(v1beta1.NodeExpirationTimeAnnotationKey, expirationTime).Infof("annotated node") + c.recorder.Publish(terminatorevents.NodeTerminationGracePeriod(node, expirationTime)) + + return nil +} + func (*Controller) Name() string { return "nodeclaim.termination" } diff --git a/pkg/controllers/nodeclaim/termination/suite_test.go b/pkg/controllers/nodeclaim/termination/suite_test.go index 0d8b29150f..5e8ae58e8f 100644 --- a/pkg/controllers/nodeclaim/termination/suite_test.go +++ b/pkg/controllers/nodeclaim/termination/suite_test.go @@ -28,17 +28,18 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" . "knative.dev/pkg/logging/testing" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/karpenter/pkg/cloudprovider" nodeclaimtermination "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/termination" "sigs.k8s.io/karpenter/pkg/apis" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" - "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/cloudprovider/fake" nodeclaimlifecycle "sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle" "sigs.k8s.io/karpenter/pkg/events" @@ -56,6 +57,7 @@ var fakeClock *clock.FakeClock var cloudProvider *fake.CloudProvider var nodeClaimLifecycleController controller.Controller var nodeClaimTerminationController controller.Controller +var recorder *test.EventRecorder func TestAPIs(t *testing.T) { ctx = TestContextWithLogger(t) @@ -72,8 +74,9 @@ var _ = BeforeSuite(func() { })) ctx = options.ToContext(ctx, test.Options()) cloudProvider = fake.NewCloudProvider() + recorder = test.NewEventRecorder() nodeClaimLifecycleController = nodeclaimlifecycle.NewController(fakeClock, env.Client, cloudProvider, events.NewRecorder(&record.FakeRecorder{})) - nodeClaimTerminationController = nodeclaimtermination.NewController(env.Client, cloudProvider) + nodeClaimTerminationController = nodeclaimtermination.NewController(env.Client, cloudProvider, recorder) }) var _ = AfterSuite(func() { @@ -100,6 +103,12 @@ var _ = Describe("Termination", func() { Finalizers: []string{ v1beta1.TerminationFinalizer, }, + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: v1beta1.Group + "/v1beta1", + Kind: "NodePool", + Name: nodePool.Name, + UID: uuid.NewUUID(), + }}, }, Spec: v1beta1.NodeClaimSpec{ Resources: v1beta1.ResourceRequirements{ @@ -249,4 +258,61 @@ var _ = Describe("Termination", func() { _, err = cloudProvider.Get(ctx, nodeClaim.Status.ProviderID) Expect(cloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) }) + It("should not annotate the node if the NodeClaim has no terminationGracePeriod", func() { + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectReconcileSucceeded(ctx, nodeClaimLifecycleController, client.ObjectKeyFromObject(nodeClaim)) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + _, err := cloudProvider.Get(ctx, nodeClaim.Status.ProviderID) + Expect(err).ToNot(HaveOccurred()) + + node := test.NodeClaimLinkedNode(nodeClaim) + ExpectApplied(ctx, env.Client, node) + + ExpectReconcileSucceeded(ctx, nodeClaimTerminationController, client.ObjectKeyFromObject(nodeClaim)) // triggers the node deletion + node = ExpectExists(ctx, env.Client, node) + Expect(node.ObjectMeta.Annotations).To(BeNil()) + }) + It("should annotate the node if the NodeClaim has a terminationGracePeriod", func() { + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectReconcileSucceeded(ctx, nodeClaimLifecycleController, client.ObjectKeyFromObject(nodeClaim)) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + _, err := cloudProvider.Get(ctx, nodeClaim.Status.ProviderID) + Expect(err).ToNot(HaveOccurred()) + + node := test.NodeClaimLinkedNode(nodeClaim) + ExpectApplied(ctx, env.Client, node) + + Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed()) + ExpectReconcileSucceeded(ctx, nodeClaimTerminationController, client.ObjectKeyFromObject(nodeClaim)) // triggers the node deletion + node = ExpectExists(ctx, env.Client, node) + + _, annotationExists := node.ObjectMeta.Annotations[v1beta1.NodeExpirationTimeAnnotationKey] + Expect(annotationExists).To(BeTrue()) + }) + It("should not change the annotation if the NodeClaim has a terminationGracePeriod and the annotation already exists", func() { + nodePool.Spec.Disruption.TerminationGracePeriod = &metav1.Duration{Duration: time.Second * 300} + ExpectApplied(ctx, env.Client, nodePool, nodeClaim) + ExpectReconcileSucceeded(ctx, nodeClaimLifecycleController, client.ObjectKeyFromObject(nodeClaim)) + + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + _, err := cloudProvider.Get(ctx, nodeClaim.Status.ProviderID) + Expect(err).ToNot(HaveOccurred()) + + node := test.NodeClaimLinkedNode(nodeClaim) + node.ObjectMeta.Annotations = map[string]string{ + v1beta1.NodeExpirationTimeAnnotationKey: "2024-04-01T12:00:00-05:00", + } + ExpectApplied(ctx, env.Client, node) + + Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed()) + ExpectReconcileSucceeded(ctx, nodeClaimTerminationController, client.ObjectKeyFromObject(nodeClaim)) // triggers the node deletion + node = ExpectExists(ctx, env.Client, node) + + Expect(node.ObjectMeta.Annotations).To(Equal(map[string]string{ + v1beta1.NodeExpirationTimeAnnotationKey: "2024-04-01T12:00:00-05:00", + })) + }) }) diff --git a/pkg/controllers/state/statenode.go b/pkg/controllers/state/statenode.go index a3b86a23e1..5106769bc8 100644 --- a/pkg/controllers/state/statenode.go +++ b/pkg/controllers/state/statenode.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/karpenter/pkg/apis/v1beta1" + "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/operator/options" "sigs.k8s.io/karpenter/pkg/scheduling" nodeutils "sigs.k8s.io/karpenter/pkg/utils/node" @@ -418,3 +419,58 @@ func RequireNoScheduleTaint(ctx context.Context, kubeClient client.Client, addTa } return multiErr } + +// returns a list of node names that had their disruption taint removed +func ValidateNoScheduleTaint(ctx context.Context, kubeClient client.Client, disruptionReason string, nodes ...*StateNode) ([]*StateNode, error) { + var multiErr error + nodesNotDisrupted := []*StateNode{} + + for _, n := range nodes { + if n.Node == nil || n.NodeClaim == nil { + continue + } + node := &v1.Node{} + if err := kubeClient.Get(ctx, client.ObjectKey{Name: n.Node.Name}, node); client.IgnoreNotFound(err) != nil { + multiErr = multierr.Append(multiErr, fmt.Errorf("getting node, %w", err)) + } + + _, hasTaint := lo.Find(node.Spec.Taints, func(taint v1.Taint) bool { + return v1beta1.IsDisruptingTaint(taint) + }) + + if hasTaint { + doNotDisruptNode := false + pods, err := n.Pods(ctx, kubeClient) + if err != nil { + multiErr = multierr.Append(multiErr, fmt.Errorf("getting pods for node %s, %w", node.Name, err)) + } + for _, p := range pods { + if _, ok := p.Annotations[v1beta1.DoNotDisruptAnnotationKey]; ok { + doNotDisruptNode = true + fmt.Printf("removing disruption taint from node %s, pod %s/%s has a %s annotation\n", node.Name, p.Namespace, p.Name, v1beta1.DoNotDisruptAnnotationKey) + break + } + + if _, ok := p.Annotations[v1beta1.DoNotConsolidateAnnotationKey]; ok { + if disruptionReason == metrics.ConsolidationReason || disruptionReason == metrics.EmptinessReason { + doNotDisruptNode = true + fmt.Printf("removing disruption taint from node %s, pod %s/%s has a %s annotation and disruption reason is %s\n", node.Name, p.Namespace, p.Name, v1beta1.DoNotConsolidateAnnotationKey, disruptionReason) + break + } + } + } + + stored := node.DeepCopy() + if doNotDisruptNode { + nodesNotDisrupted = append(nodesNotDisrupted, n) + node.Spec.Taints = lo.Reject(node.Spec.Taints, func(t v1.Taint, _ int) bool { + return t.Key == v1beta1.DisruptionTaintKey + }) + if err := kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { + multiErr = multierr.Append(multiErr, fmt.Errorf("patching node %s, %w", node.Name, err)) + } + } + } + } + return nodesNotDisrupted, multiErr +} diff --git a/pkg/test/nodes.go b/pkg/test/nodes.go index 2d9a2a1f83..37f00bf259 100644 --- a/pkg/test/nodes.go +++ b/pkg/test/nodes.go @@ -74,6 +74,14 @@ func NodeClaimLinkedNode(nodeClaim *v1beta1.NodeClaim) *v1.Node { Labels: nodeClaim.Labels, Annotations: nodeClaim.Annotations, Finalizers: nodeClaim.Finalizers, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "karpenter.sh/v1beta1", + Kind: "NodeClaim", + Name: nodeClaim.Name, + UID: "6ad01199-df8b-49dc-abde-5673d730f735", + }, + }, }, Taints: append(nodeClaim.Spec.Taints, nodeClaim.Spec.StartupTaints...), Capacity: nodeClaim.Status.Capacity, diff --git a/pkg/utils/nodeclaim/nodeclaim.go b/pkg/utils/nodeclaim/nodeclaim.go index 76cfb2c87e..46e41cd576 100644 --- a/pkg/utils/nodeclaim/nodeclaim.go +++ b/pkg/utils/nodeclaim/nodeclaim.go @@ -20,10 +20,12 @@ import ( "context" "errors" "fmt" + "strings" "github.com/samber/lo" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "knative.dev/pkg/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -222,6 +224,31 @@ func NewFromNode(node *v1.Node) *v1beta1.NodeClaim { return nc } +// NodePoolForNodeClaim is a helper function that takes a v1beta1.NodeClaim and finds its matching NodePool using its ownerRefs +func NodePoolForNodeClaim(ctx context.Context, c client.Client, nodeClaim *v1beta1.NodeClaim) (*v1beta1.NodePool, error) { + nodePool := &v1beta1.NodePool{} + nodePoolRef, ok := lo.Find(nodeClaim.OwnerReferences, func(o metav1.OwnerReference) bool { + // verify that we're finding Karpenter's NodePool of any version permutation + groupVersion, err := schema.ParseGroupVersion(o.APIVersion) + if err != nil { + return false + } + return strings.HasPrefix(groupVersion.Group, v1beta1.Group) && o.Kind == "NodePool" + }) + if !ok { + return nil, fmt.Errorf("could not find NodePool for NodeClaim") + } + + nodePoolName := types.NamespacedName{ + Name: nodePoolRef.Name, + } + if err := c.Get(ctx, nodePoolName, nodePool); err != nil { + return nil, fmt.Errorf("retrieving NodePool, %w", err) + } + + return nodePool, nil +} + func UpdateNodeOwnerReferences(nodeClaim *v1beta1.NodeClaim, node *v1.Node) *v1.Node { node.OwnerReferences = append(node.OwnerReferences, metav1.OwnerReference{ APIVersion: v1beta1.SchemeGroupVersion.String(),