Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ spec:
memory leak protection, and disruption testing.
pattern: ^(([0-9]+(s|m|h))+)|(Never)$
type: string
terminationGracePeriod:
description: |-
TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated.
Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint.
If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout,
that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds.


Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod.
Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown


This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period.
It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks.
If left undefined, the controller will wait indefinitely for pods to be drained.
type: string
type: object
x-kubernetes-validations:
- message: consolidateAfter cannot be combined with consolidationPolicy=WhenUnderutilized
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/v1beta1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ const (
// Karpenter specific annotations
const (
DoNotDisruptAnnotationKey = Group + "/do-not-disrupt"
DoNotConsolidateAnnotationKey = Group + "/do-not-consolidate"
ProviderCompatabilityAnnotationKey = CompatabilityGroup + "/provider"
ManagedByAnnotationKey = Group + "/managed-by"
NodePoolHashAnnotationKey = Group + "/nodepool-hash"
NodePoolHashVersionAnnotationKey = Group + "/nodepool-hash-version"
NodeExpirationTimeAnnotationKey = Group + "/node-expiration-time"
)

// Karpenter specific finalizers
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/v1beta1/nodepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ type Disruption struct {
// +kubebuilder:validation:MaxItems=50
// +optional
Budgets []Budget `json:"budgets,omitempty" hash:"ignore"`
// TerminationGracePeriod is the duration the controller will wait before forcefully terminating a node, measured from when deletion is first initiated.
// Once the GracePeriod has expired, all pods on the node will be shutdown using the official non-graceful shutdown taint.
// If a pod would be terminated without being granted its full terminationGracePeriodSeconds prior to the node timeout,
// that pod will be deleted up at T = node timeout - pod terminationGracePeriodSeconds.
//
// Warning: this bypasses any PDB or terminationGracePeriodSeconds value set for a Pod.
// Requires: K8s 1.26 or higher: https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
//
// This field is intended to be used by cluster administrators to enforce that nodes can be cycled within a given time period.
// It can also be used to allow maximum time limits for long-running jobs which can delay node termination with preStop hooks.
// If left undefined, the controller will wait indefinitely for pods to be drained.
// +optional
TerminationGracePeriod *metav1.Duration `json:"terminationGracePeriod,omitempty"`
}

// Budget defines when Karpenter will restrict the
Expand Down
11 changes: 11 additions & 0 deletions pkg/apis/v1beta1/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import v1 "k8s.io/api/core/v1"
const (
DisruptionTaintKey = Group + "/disruption"
DisruptingNoScheduleTaintValue = "disrupting"

DisruptionNonGracefulShutdownValue = "nodeshutdown"
)

var (
Expand All @@ -32,6 +34,15 @@ var (
Effect: v1.TaintEffectNoSchedule,
Value: DisruptingNoScheduleTaintValue,
}

// DisruptionNonGracefulShutdown is used by the deprovisioning controller to forcefully
// shut down a node. This does not respect graceful termination of any pods on the node.
// https://kubernetes.io/docs/concepts/architecture/nodes/#non-graceful-node-shutdown
DisruptionNonGracefulShutdown = v1.Taint{
Key: v1.TaintNodeOutOfService,
Effect: v1.TaintEffectNoExecute,
Value: DisruptionNonGracefulShutdownValue,
}
)

func IsDisruptingTaint(taint v1.Taint) bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func NewControllers(
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cluster),
informer.NewNodeClaimController(kubeClient, cluster),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue), recorder),
termination.NewController(kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient),
metricsnodepool.NewController(kubeClient),
metricsnode.NewController(cluster),
nodepoolcounter.NewController(kubeClient, cluster),
nodeclaimconsistency.NewController(clock, kubeClient, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimtermination.NewController(kubeClient, cloudProvider),
nodeclaimtermination.NewController(kubeClient, cloudProvider, recorder),
nodeclaimdisruption.NewController(clock, kubeClient, cluster, cloudProvider),
leasegarbagecollection.NewController(kubeClient),
}
Expand Down
41 changes: 36 additions & 5 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"bytes"
"context"
"fmt"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -130,17 +132,21 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
c.recordRun(fmt.Sprintf("%T", m))
success, err := c.disrupt(ctx, m)
_, err := c.disrupt(ctx, m)
if err != nil {
return reconcile.Result{}, fmt.Errorf("disrupting via %q, %w", m.Type(), err)
}
if success {
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}
// ensure we attempt all disruption methods before completing reconciliation
// if success {
// return reconcile.Result{RequeueAfter: controller.Immediately}, nil
// }
}

// All methods did nothing, so return nothing to do
return reconcile.Result{RequeueAfter: pollingPeriod}, nil
// return reconcile.Result{RequeueAfter: pollingPeriod}, nil

// loop complete, requeue immediately
return reconcile.Result{RequeueAfter: controller.Immediately}, nil
}

func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) {
Expand Down Expand Up @@ -194,6 +200,31 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}

sleepTime := os.Getenv("DISRUPTION_ANNOTATION_SLEEP_SECONDS")
if len(sleepTime) > 0 {
sleepSeconds, err := strconv.Atoi(sleepTime)
if err == nil {
logging.FromContext(ctx).With("command-id", commandID).With("sleep", sleepTime).Infof("waiting for do-not-disrupt or do-not-consolidate pods that may have scheduled...")
time.Sleep(time.Duration(sleepSeconds * int(time.Second)))
} else {
logging.FromContext(ctx).With("command-id", commandID).With("sleep", sleepTime).With("variable", "DISRUPTION_ANNOTATION_SLEEP_SECONDS").Errorf("parsing disruption sleep time")
}
}

// verify that the nodes we intend to disrupt do not have any do-not-disrupt pods, remove the DoNotSchedule disruption taint if they do
nodesToNotDisrupt, er := state.ValidateNoScheduleTaint(ctx, c.kubeClient, m.Type(), stateNodes...)
if er != nil {
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, er), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}

// remove any nodes that had do-not-disrupt pods from the list of nodes we intend to disrupt
for _, n := range nodesToNotDisrupt {
logging.FromContext(ctx).With("command-id", commandID).Infof("avoiding disruption of node %s due to a do-not-disrupt or do-not-consolidate annotation race condition", n.Node.Name)
}
cmd.candidates = lo.Reject(cmd.candidates, func(c *Candidate, _ int) bool {
return lo.Contains(nodesToNotDisrupt, c.StateNode)
})

var nodeClaimNames []string
var err error
if len(cmd.replacements) > 0 {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/disruption/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ func NewCandidate(ctx context.Context, kubeClient client.Client, recorder events
return nil, fmt.Errorf(`pod %q has "karpenter.sh/do-not-disrupt" annotation`, client.ObjectKeyFromObject(po))
}
}
if pdbKey, ok := pdbs.CanEvictPods(pods); !ok {
recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("PDB %q prevents pod evictions", pdbKey))...)
return nil, fmt.Errorf("pdb %q prevents pod evictions", pdbKey)
}
// if pdbKey, ok := pdbs.CanEvictPods(pods); !ok {
// logging.FromContext(ctx).Infof("ignoring non-evictable pdb: %q", pdbKey)
// // recorder.Publish(disruptionevents.Blocked(node.Node, node.NodeClaim, fmt.Sprintf("PDB %q prevents pod evictions", pdbKey))...)
// // return nil, fmt.Errorf("pdb %q prevents pod evictions", pdbKey)
// }
return &Candidate{
StateNode: node.DeepCopy(),
instanceType: instanceType,
Expand Down
29 changes: 26 additions & 3 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,20 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res
if err := c.deleteAllNodeClaims(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}
if err := c.terminator.Taint(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node, %w", err)

nodeGracePeriodExpirationTime, err := c.nodeExpirationTime(node)
if err != nil {
return reconcile.Result{}, err
}
if nodeGracePeriodExpirationTime != nil && time.Now().After(*nodeGracePeriodExpirationTime) {
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNonGracefulShutdown); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1.TaintNodeOutOfService, err)
}
}
if err := c.terminator.Taint(ctx, node, v1beta1.DisruptionNoScheduleTaint); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1beta1.DisruptionTaintKey, err)
}
if err := c.terminator.Drain(ctx, node); err != nil {
if err := c.terminator.Drain(ctx, node, nodeGracePeriodExpirationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
Expand All @@ -102,6 +112,7 @@ func (c *Controller) Finalize(ctx context.Context, node *v1.Node) (reconcile.Res
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// Be careful when removing this delete call in the Node termination flow
Expand Down Expand Up @@ -157,6 +168,18 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error {
return nil
}

func (c *Controller) nodeExpirationTime(node *v1.Node) (*time.Time, error) {
if expirationTimeString, exists := node.ObjectMeta.Annotations[v1beta1.NodeExpirationTimeAnnotationKey]; exists {
expirationTime, err := time.Parse(time.RFC3339, expirationTimeString)
if err != nil {
return nil, fmt.Errorf("parsing %s annotation, %w", v1beta1.NodeExpirationTimeAnnotationKey, err)
}
return &expirationTime, nil
}

return nil, nil
}

func (c *Controller) Builder(_ context.Context, m manager.Manager) operatorcontroller.Builder {
return operatorcontroller.Adapt(controllerruntime.
NewControllerManagedBy(m).
Expand Down
Loading