Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ const (

// instrumentationScope is the name of OpenTelemetry instrumentation scope
instrumentationScope = "k8s.io/kubernetes/pkg/kubelet"

// PodRejectionMessagePrefix is the prefix used in status messages to identify a
// pod that was rejected by the kubelet and should never enter the pod worker pipeline.
PodRejectionMessagePrefix = "Pod was rejected: "
)

var (
Expand Down Expand Up @@ -2411,11 +2415,11 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
// and updates the pod to the failed phase in the status manager.
func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) {
kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message)
kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{
kl.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{
QOSClass: v1qos.GetPodQOS(pod), // keep it as is
Phase: v1.PodFailed,
Reason: reason,
Message: "Pod was rejected: " + message})
Message: PodRejectionMessagePrefix + message})
}

func recordAdmissionRejection(reason string) {
Expand Down Expand Up @@ -2711,6 +2715,15 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
}
}
}

// Skip pods rejected during this kubelet session.
// After restart, previously rejected pods may enter pod_workers, but they
// will be immediately marked as terminated because their pod cache is empty
// (no containers were ever created), so they won't affect resource accounting.
if kl.statusManager.IsPodRejected(pod.UID) {
continue
}

kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
Expand Down Expand Up @@ -2761,6 +2774,18 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
}
}

// Skip rejected pods during UPDATE operations.
// Pods always arrive as ADD before UPDATE (enforced by the config layer).
// If a pod failed admission in HandlePodAdditions, it was rejected and
// never sent to pod_workers. UPDATE operations for such pods should be
// ignored because:
// 1. No containers exist - nothing to sync or clean up
// 2. The pod is already in terminal Failed state
// 3. REMOVE operation will eventually clean up podManager
if kl.statusManager.IsPodRejected(pod.UID) {
continue
}

kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
Expand Down
160 changes: 158 additions & 2 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,7 +1760,12 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
}

// Let the pod worker sets the status to fail after this sync.
kubelet.HandlePodUpdates(pods)
// Use HandlePodSyncs to test active deadline behavior.
// This bypasses admission and sends pods directly to the pod worker.
// HandlePodUpdates requires the pod to already be known to the worker,
// and HandlePodAdditions would reject the pod due to test node capacity.
kubelet.podManager.SetPods(pods)
kubelet.HandlePodSyncs(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.Equal(t, v1.PodFailed, status.Phase)
Expand Down Expand Up @@ -1809,8 +1814,11 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
}},
}

// Use HandlePodSyncs to test active deadline behavior.
// This bypasses admission and sends pods directly to the pod worker.
// HandlePodUpdates requires the pod to already be known to the worker.
kubelet.podManager.SetPods(pods)
kubelet.HandlePodUpdates(pods)
kubelet.HandlePodSyncs(pods)
status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
assert.NotEqual(t, v1.PodFailed, status.Phase)
Expand Down Expand Up @@ -4592,3 +4600,151 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) {
})
}
}

func TestIsPodRejected(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

// Helper to create a unique pod for each test case.
podCounter := 0
newUniquePod := func() *v1.Pod {
podCounter++
pod := newTestPods(1)[0]
pod.UID = types.UID(fmt.Sprintf("test-pod-uid-%d", podCounter))
pod.Name = fmt.Sprintf("test-pod-%d", podCounter)
return pod
}

testCases := []struct {
name string
setupPod func() *v1.Pod
setupLocalStatus func(*v1.Pod)
expectedRejected bool
description string
}{
{
name: "no local status",
setupPod: func() *v1.Pod {
return newUniquePod()
},
setupLocalStatus: func(pod *v1.Pod) {},
expectedRejected: false,
description: "pod with no local status should not be considered rejected",
},
{
name: "local status set via SetPodStatus",
setupPod: func() *v1.Pod {
return newUniquePod()
},
setupLocalStatus: func(pod *v1.Pod) {
kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: "ImagePullBackOff",
Message: "Failed to pull image",
})
},
expectedRejected: false,
description: "pod with status set via SetPodStatus should not be considered rejected",
},
{
name: "rejected pod via SetPodRejected",
setupPod: func() *v1.Pod {
return newUniquePod()
},
setupLocalStatus: func(pod *v1.Pod) {
kubelet.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: "OutOfcpu",
Message: PodRejectionMessagePrefix + "insufficient CPU",
})
},
expectedRejected: true,
description: "pod rejected via SetPodRejected should be considered rejected",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
pod := tc.setupPod()
tc.setupLocalStatus(pod)

result := kubelet.statusManager.IsPodRejected(pod.UID)
assert.Equal(t, tc.expectedRejected, result, tc.description)
})
}
}

func TestRejectPodUsesConstant(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)

defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

pod := newTestPods(1)[0]
reason := "OutOfcpu"
message := "insufficient CPU resources"

kubelet.rejectPod(pod, reason, message)

status, found := kubelet.statusManager.GetPodStatus(pod.UID)
assert.True(t, found, "expected to find status for rejected pod")
assert.Equal(t, v1.PodFailed, status.Phase)
assert.Equal(t, reason, status.Reason)

expectedMessage := PodRejectionMessagePrefix + message
assert.Equal(t, expectedMessage, status.Message, "rejectPod should use PodRejectionMessagePrefix constant")
assert.True(t, strings.HasPrefix(status.Message, PodRejectionMessagePrefix), "rejection message should start with prefix")
}

func TestHandlePodUpdatesSkipsRejectedPods(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

// Create test pods
rejectedPod := newTestPods(1)[0]
evictedPod := newTestPods(1)[0]
evictedPod.UID = "evicted-pod-uid"
evictedPod.Name = "evicted-pod"
now := metav1.NewTime(time.Now())
evictedPod.DeletionTimestamp = &now

normalPod := newTestPods(1)[0]
normalPod.UID = "normal-pod-uid"
normalPod.Name = "normal-pod"

// Setup rejected pod status
kubelet.statusManager.SetPodRejected(klog.TODO(), rejectedPod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: "OutOfcpu",
Message: PodRejectionMessagePrefix + "insufficient CPU",
})

// Setup evicted pod status (has rejection message but also deletion timestamp)
kubelet.statusManager.SetPodRejected(klog.TODO(), evictedPod, v1.PodStatus{
Phase: v1.PodFailed,
Reason: "Evicted",
Message: PodRejectionMessagePrefix + "evicted for testing",
})

// Add all pods to podManager
kubelet.podManager.AddPod(rejectedPod)
kubelet.podManager.AddPod(evictedPod)
kubelet.podManager.AddPod(normalPod)

// Call HandlePodUpdates - this should skip rejected pods but process evicted and normal pods
kubelet.HandlePodUpdates([]*v1.Pod{rejectedPod, evictedPod, normalPod})

// Wait a short time for processing
time.Sleep(10 * time.Millisecond)

// Verify by checking if the pods are known to pod workers
// Rejected pods should not be known to pod workers since they were skipped
isRejectedKnown := kubelet.podWorkers.IsPodTerminationRequested(rejectedPod.UID)
assert.False(t, isRejectedKnown, "rejected pod should not be known to pod workers")

// Note: We can't easily test that evicted and normal pods were processed
// without more complex setup, but the main thing we're testing is that
// rejected pods are correctly skipped because they are not known to pod_workers
}
41 changes: 36 additions & 5 deletions pkg/kubelet/status/status_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type versionedPodStatus struct {
// True if the status is generated at the end of SyncTerminatedPod, or after it is completed.
podIsFinished bool

// True if the pod was rejected during admission.
podIsRejected bool

status v1.PodStatus
}

Expand Down Expand Up @@ -179,6 +182,12 @@ type Manager interface {
// BackfillPodResizeConditions backfills the status manager's resize conditions by reading them from the
// provided pods' statuses.
BackfillPodResizeConditions(pods []*v1.Pod)

// SetPodRejected caches the status for a pod and also marks it as rejected.
SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus)

// IsPodRejected returns true if the pod was rejected during admission.
IsPodRejected(uid types.UID) bool
}

const syncPeriod = 10 * time.Second
Expand Down Expand Up @@ -407,7 +416,17 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) {
func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
m.setPodStatusLocked(logger, pod, status, false)
}

func (m *manager) SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) {
m.podStatusesLock.Lock()
defer m.podStatusesLock.Unlock()
m.setPodStatusLocked(logger, pod, status, true)
}

// setPodStatusLocked updates the pod status cache. The caller must hold podStatusesLock.
func (m *manager) setPodStatusLocked(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, podIsRejected bool) {
// Make sure we're caching a deep copy.
status = *status.DeepCopy()

Expand All @@ -417,7 +436,14 @@ func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodSta
// Force a status update if deletion timestamp is set. This is necessary
// because if the pod is in the non-running state, the pod worker still
// needs to be able to trigger an update and/or deletion.
m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false)
m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false, podIsRejected)
}

func (m *manager) IsPodRejected(uid types.UID) bool {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))]
return ok && status.podIsRejected
}

func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
Expand Down Expand Up @@ -480,7 +506,7 @@ func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, co
allContainerStatuses := append(status.InitContainerStatuses, status.ContainerStatuses...)
updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(pod, &oldStatus.status, status.Conditions, allContainerStatuses, status.Phase))
updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(pod, &oldStatus.status, allContainerStatuses, status.Phase))
m.updateStatusInternal(logger, pod, status, false, false)
m.updateStatusInternal(logger, pod, status, false, false, false)
}

func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
Expand Down Expand Up @@ -522,7 +548,7 @@ func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, cont
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
containerStatus.Started = &started

m.updateStatusInternal(logger, pod, status, false, false)
m.updateStatusInternal(logger, pod, status, false, false, false)
}

func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
Expand Down Expand Up @@ -616,7 +642,7 @@ func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) {
}

logger.V(5).Info("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID)
m.updateStatusInternal(logger, pod, status, true, true)
m.updateStatusInternal(logger, pod, status, true, true, false)
}

// hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which
Expand Down Expand Up @@ -752,7 +778,7 @@ func checkContainerStateTransition(oldStatuses, newStatuses *v1.PodStatus, podSp
// updateStatusInternal updates the internal status cache, and queues an update to the api server if
// necessary.
// This method IS NOT THREAD SAFE and must be called from a locked function.
func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) {
func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished, podIsRejected bool) {
var oldStatus v1.PodStatus
cachedStatus, isCached := m.podStatuses[pod.UID]
if isCached {
Expand All @@ -763,6 +789,10 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v
logger.Info("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error", "pod", klog.KObj(pod))
podIsFinished = true
}
if cachedStatus.podIsRejected && !podIsRejected {
logger.Info("Got unexpected podIsRejected=false, while podIsRejected=true in status cache, programmer error", "pod", klog.KObj(pod))
podIsRejected = true
}
}
} else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
oldStatus = mirrorPod.Status
Expand Down Expand Up @@ -855,6 +885,7 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v
podName: pod.Name,
podNamespace: pod.Namespace,
podIsFinished: podIsFinished,
podIsRejected: podIsRejected,
}

// Multiple status updates can be generated before we update the API server,
Expand Down