Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2657,37 +2657,41 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
sort.Sort(sliceutils.PodsByCreationTime(pods))
var pendingResizes []types.UID
for _, pod := range pods {
// Always add the pod to the pod manager. Kubelet relies on the pod
// manager as the source of truth for the desired state. If a pod does
// not exist in the pod manager, it means that it has been deleted in
// the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)

kl.podCertificateManager.TrackPod(context.TODO(), pod)

pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
if pod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID)
// Handle mirror pods separately. Mirror pods don't go through admission
// and are always added to the pod manager.
if kubetypes.IsMirrorPod(pod) {
kl.podManager.AddPod(pod)
kl.podCertificateManager.TrackPod(context.TODO(), pod)

// Look up the static pod this mirror pod corresponds to
staticPod, ok := kl.podManager.GetPodByMirrorPod(pod)
if !ok || staticPod == nil {
klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(pod), "mirrorPodUID", pod.UID)
continue
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
Pod: staticPod,
MirrorPod: pod,
UpdateType: kubetypes.SyncPodUpdate,
StartTime: start,
})
continue
}

// For regular and static pods, perform admission check BEFORE adding
// to podManager. This ensures rejected pods are never added to podManager
// and cannot affect resource accounting for subsequent pod admissions.
// See: https://github.com/kubernetes/kubernetes/issues/135296
//
// Only go through the admission process if the pod is not requested
// for termination by another part of the kubelet. If the pod is already
// using resources (previously admitted), the pod worker is going to be
// shutting it down. If the pod hasn't started yet, we know that when
// the pod worker is invoked it will also avoid setting up the pod, so
// we simply avoid doing any work.
// We also do not try to admit the pod that is already in terminated state.
if !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase) {
podRequiresAdmission := !kl.podWorkers.IsPodTerminationRequested(pod.UID) && !podutil.IsPodPhaseTerminal(pod.Status.Phase)
if podRequiresAdmission {
// Check if we can admit the pod; if not, reject it.
// We failed pods that we rejected, so activePods include all admitted
// pods that are alive.
Expand All @@ -2700,15 +2704,25 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
recordAdmissionRejection(reason)
continue
}
}

if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Backfill the queue of pending resizes, but only after all the pods have
// been added. This ensures that no resizes get resolved until all the
// existing pods are added.
_, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod)
if updatedFromAllocation {
pendingResizes = append(pendingResizes, pod.UID)
}
// Admission passed (or pod is terminating/terminal). Now add to podManager.
// Kubelet relies on the pod manager as the source of truth for the desired
// state. If a pod does not exist in the pod manager, it means that it has
// been deleted in the apiserver and no action (other than cleanup) is required.
kl.podManager.AddPod(pod)
kl.podCertificateManager.TrackPod(context.TODO(), pod)

// Get the mirror pod for static pods (if one exists)
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)

if podRequiresAdmission && utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// Backfill the queue of pending resizes, but only after all the pods have
// been added. This ensures that no resizes get resolved until all the
// existing pods are added.
_, updatedFromAllocation := kl.allocationManager.UpdatePodFromAllocation(pod)
if updatedFromAllocation {
pendingResizes = append(pendingResizes, pod.UID)
}
}
kl.podWorkers.UpdatePod(UpdatePodOptions{
Expand Down
109 changes: 109 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2636,6 +2636,115 @@ func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
checkPodStatus(t, kl, podToAdmit, v1.PodPending)
}

// TestHandlePodAdditionsRemovesRejectedPodsFromPodManager verifies that pods rejected
// during admission are removed from the podManager to prevent them from being
// included in resource accounting for subsequent pod admissions.
//
// This test reproduces the scenario from https://github.com/kubernetes/kubernetes/issues/135296:
// - Node has 100m CPU allocatable
// - Pod 1 requests 1000m CPU (way over capacity), gets rejected
// - Pod 2 requests 50m CPU (fits in capacity), should be admitted
func TestHandlePodAdditionsRemovesRejectedPodsFromPodManager(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}

// Pod 1: Requests way more CPU than the node has - will be rejected.
podOverCapacity := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "over-capacity",
Name: "pod-over-capacity",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
},
},
},
},
},
}

// Pod 2: Requests a small amount of CPU that fits - should be admitted.
podFits := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "fits",
Name: "pod-fits",
Namespace: "foo",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "container",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("50m"),
},
},
},
},
},
}

// Process both pods. Pod 1 should be rejected, Pod 2 should be admitted.
kl.HandlePodAdditions([]*v1.Pod{podOverCapacity, podFits})

// Verify that the rejected pod (over capacity) is not in the podManager.
// Without the fix, https://github.com/kubernetes/kubernetes/pull/135298
// the rejected pod remains in podManager.GetPods(), which is used by GetActivePods()
// for resource accounting. Although filterOutInactivePods() filters out pods with
// Failed status, there's a race window where the status update may not be visible
// to other goroutines, causing the rejected pod's resources to be counted (as seen
// in the issue https://github.com/kubernetes/kubernetes/issues/135296
_, found := kl.podManager.GetPodByUID(podOverCapacity.UID)
if found {
t.Errorf("Rejected pod %s should have been removed from podManager", podOverCapacity.Name)
}

// Verify that the admitted pod is still in the podManager.
_, found = kl.podManager.GetPodByUID(podFits.UID)
if !found {
t.Errorf("Admitted pod %s should still be in podManager", podFits.Name)
}

// Calculate the total CPU requested by pods in podManager (before filtering).
allPodsInManager := kl.podManager.GetPods()
nodeInfo := schedulerframework.NewNodeInfo(allPodsInManager...)
usedMilliCPU := nodeInfo.Requested.MilliCPU

// Only the admitted pod (50m) should be in podManager.
// Without the fix, the rejected pod (1000m) would also be included, totaling 1050m.
expectedMilliCPU := int64(50)
if usedMilliCPU != expectedMilliCPU {
t.Errorf("Resource accounting from podManager.GetPods() is incorrect: "+
"used CPU = %dm, expected %dm. The rejected pod's resources are being counted.",
usedMilliCPU, expectedMilliCPU)
}

// Verify the rejected pod's status shows it failed.
checkPodStatus(t, kl, podOverCapacity, v1.PodFailed)

// Verify the admitted pod is pending (not rejected).
checkPodStatus(t, kl, podFits, v1.PodPending)
}

func TestPodResourceAllocationReset(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)
testKubelet := newTestKubelet(t, false)
Expand Down