From 6f433043df5eef5a688e7011754b4d3dac2386c7 Mon Sep 17 00:00:00 2001 From: Pablo Acevedo Montserrat Date: Fri, 30 Jan 2026 12:07:50 +0100 Subject: [PATCH 1/5] kubelet: Fix race condition in pod admission kubelet: Prevent rejected pods from entering pod_workers When a pod fails admission, it iss marked as Failed but may still receive UPDATE operations from API server resyncs. If these updates cause the pod to enter pod_workers, it gets terminatingAt set and is counted as "active" by filterOutInactivePods until termination completes. This causes incorrect resource accounting during admission of subsequent pods. In order to avoid adding rejected pods into resource accounting, these should never enter pod_workers: They are in a terminal state and they will not run any containers that require cleanup. There is only one way of identifying a rejected pod, which is a special status that the kubelet sets for them. This is the only distinctive feature the kubelet may use to filter them out of pod_workers if they arrive through an UPDATE operation. As kubelet config loops always processes pods a ADD before UPDATE, when the UPDATE comes the pod must already be in pod_workers if it was admitted, so it is safe to assume that if the pod is not already in pod_workers it was rejected. In the event of a kubelet restart, all pods arrive as ADD again, so there is a need to check for the status message if the pod is in a terminal state (meaning it was rejected before the restart). Signed-off-by: Pablo Acevedo Montserrat --- pkg/kubelet/kubelet.go | 50 +++++++++++++++++++++++++++++++++++++- pkg/kubelet/pod_workers.go | 14 +++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e72cf3d9b203b..ac18579faa364 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -229,6 +229,10 @@ const ( // instrumentationScope is the name of OpenTelemetry instrumentation scope instrumentationScope = "k8s.io/kubernetes/pkg/kubelet" + + // PodRejectionMessagePrefix is the prefix used in status messages to identify a + // pod that was rejected by the kubelet and should never enter the pod worker pipeline. + PodRejectionMessagePrefix = "Pod was rejected: " ) var ( @@ -2415,7 +2419,28 @@ func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { QOSClass: v1qos.GetPodQOS(pod), // keep it as is Phase: v1.PodFailed, Reason: reason, - Message: "Pod was rejected: " + message}) + Message: PodRejectionMessagePrefix + message}) +} + +// isLocallyRejected checks if a pod has already been rejected. These pods +// have never had containers created, have a Failed phase and a specific +// status that is forced by rejectPod() call. The only distinctive feature +// they have from other pods is the presence of a specific status: empty +// and having a message with the prefix PodRejectionMessagePrefix. +// This function looks for the message in both statusManager (current +// session rejection) and, if not found, in status from API server (rejection +// that was already synced to the API in case of a kubelet restart). +func (kl *Kubelet) isLocallyRejected(pod *v1.Pod) bool { + localStatus, found := kl.statusManager.GetPodStatus(pod.UID) + if found && strings.HasPrefix(localStatus.Message, PodRejectionMessagePrefix) { + return true + } + + if strings.HasPrefix(pod.Status.Message, PodRejectionMessagePrefix) { + return true + } + + return false } func recordAdmissionRejection(reason string) { @@ -2711,6 +2736,16 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } } + + // Skip previously rejected pods on kubelet restart. + // When kubelet restarts, all pods arrive as ADD operations. Pods that were + // previously rejected have Failed phase and skip the admission check above + // (due to IsPodPhaseTerminal check). Without this filter, they would enter + // pod_workers. + if kl.isLocallyRejected(pod) { + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, @@ -2761,6 +2796,19 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { } } + // Skip rejected pods during UPDATE operations. + // + // Pods always arrive as ADD before UPDATE (guaranteed by the config layer). + // If a pod failed admission in HandlePodAdditions, it was never sent to + // pod_workers and won't be known to the worker. UPDATE operations for such + // pods should be ignored because: + // 1. No containers exist - nothing to sync or clean up + // 2. The pod is already in terminal Failed state + // 3. REMOVE operation will eventually clean up podManager + if !kl.podWorkers.IsPodKnownToWorker(pod.UID) { + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 68d47ff5bfaa0..6b8a84f224d59 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -183,6 +183,13 @@ type PodWorkers interface { // Intended for use by the kubelet config loops, but not subsystems, which should // use ShouldPod*(). IsPodKnownTerminated(uid types.UID) bool + // IsPodKnownToWorker returns true if the pod has ever been sent to the pod + // worker via UpdatePod (i.e., has an entry in podSyncStatuses). A pod that + // did not enter the pod worker pipeline is a pod that was rejected and + // therefore should not be processed. No containers exist for these pods + // and they are already in a terminal Failed state. + // Intended for use by the kubelet config loops, but not subsystems. + IsPodKnownToWorker(uid types.UID) bool // CouldHaveRunningContainers returns true before the pod workers have synced, // once the pod workers see the pod (syncPod could be called), and returns false // after the pod has been terminated (running containers guaranteed stopped). @@ -649,6 +656,13 @@ func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool { return false } +func (p *podWorkers) IsPodKnownToWorker(uid types.UID) bool { + p.podLock.Lock() + defer p.podLock.Unlock() + _, ok := p.podSyncStatuses[uid] + return ok +} + func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() From 8c2306263051b9984cbcb1bb8007f6e2269d74fe Mon Sep 17 00:00:00 2001 From: Pablo Acevedo Montserrat Date: Fri, 30 Jan 2026 13:37:10 +0100 Subject: [PATCH 2/5] kubelet: Add unit tests for rejected pods new behavior Signed-off-by: Pablo Acevedo Montserrat --- pkg/kubelet/kubelet_test.go | 198 +++++++++++++++++++++++++++++++- pkg/kubelet/pod_workers_test.go | 6 + 2 files changed, 202 insertions(+), 2 deletions(-) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7746458001108..6861d7dbf2c16 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1760,7 +1760,12 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - kubelet.HandlePodUpdates(pods) + // Use HandlePodSyncs to test active deadline behavior. + // This bypasses admission and sends pods directly to the pod worker. + // HandlePodUpdates requires the pod to already be known to the worker, + // and HandlePodAdditions would reject the pod due to test node capacity. + kubelet.podManager.SetPods(pods) + kubelet.HandlePodSyncs(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.Equal(t, v1.PodFailed, status.Phase) @@ -1809,8 +1814,11 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { }}, } + // Use HandlePodSyncs to test active deadline behavior. + // This bypasses admission and sends pods directly to the pod worker. + // HandlePodUpdates requires the pod to already be known to the worker. kubelet.podManager.SetPods(pods) - kubelet.HandlePodUpdates(pods) + kubelet.HandlePodSyncs(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.NotEqual(t, v1.PodFailed, status.Phase) @@ -4592,3 +4600,189 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { }) } } + +func TestIsLocallyRejected(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + pod := newTestPods(1)[0] + + testCases := []struct { + name string + setupPod func() *v1.Pod + setupLocalStatus func(*v1.Pod) + expectedRejected bool + description string + }{ + { + name: "no local status", + setupPod: func() *v1.Pod { + return pod + }, + setupLocalStatus: func(pod *v1.Pod) {}, + expectedRejected: false, + description: "pod with no local status should not be considered rejected", + }, + { + name: "local status without rejection message", + setupPod: func() *v1.Pod { + return pod + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "ImagePullBackOff", + Message: "Failed to pull image", + }) + }, + expectedRejected: false, + description: "pod with Failed status but no rejection message should not be considered rejected", + }, + { + name: "rejected pod without deletion timestamp", + setupPod: func() *v1.Pod { + return pod + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfcpu", + Message: PodRejectionMessagePrefix + "insufficient CPU", + }) + }, + expectedRejected: true, + description: "rejected pod without deletion timestamp should be considered rejected", + }, + { + name: "pod with deletion timestamp and rejection message", + setupPod: func() *v1.Pod { + podCopy := pod + now := metav1.NewTime(time.Now()) + podCopy.DeletionTimestamp = &now + return podCopy + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfcpu", + Message: PodRejectionMessagePrefix + "insufficient CPU", + }) + }, + expectedRejected: true, + description: "pod with rejection message is considered rejected regardless of deletion timestamp", + }, + { + name: "pod with partial rejection message prefix", + setupPod: func() *v1.Pod { + return pod + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfMemory", + Message: "Pod was reject", + }) + }, + expectedRejected: false, + description: "pod with partial rejection message prefix should not be considered rejected", + }, + { + name: "pod with rejection message in the middle", + setupPod: func() *v1.Pod { + return pod + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfMemory", + Message: "Container failed because " + PodRejectionMessagePrefix + "insufficient resources", + }) + }, + expectedRejected: false, + description: "pod with rejection message not at start should not be considered rejected", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pod := tc.setupPod() + tc.setupLocalStatus(pod) + + result := kubelet.isLocallyRejected(pod) + assert.Equal(t, tc.expectedRejected, result, tc.description) + }) + } +} + +func TestRejectPodUsesConstant(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + pod := newTestPods(1)[0] + reason := "OutOfcpu" + message := "insufficient CPU resources" + + kubelet.rejectPod(pod, reason, message) + + status, found := kubelet.statusManager.GetPodStatus(pod.UID) + assert.True(t, found, "expected to find status for rejected pod") + assert.Equal(t, v1.PodFailed, status.Phase) + assert.Equal(t, reason, status.Reason) + + expectedMessage := PodRejectionMessagePrefix + message + assert.Equal(t, expectedMessage, status.Message, "rejectPod should use PodRejectionMessagePrefix constant") + assert.True(t, strings.HasPrefix(status.Message, PodRejectionMessagePrefix), "rejection message should start with prefix") +} + +func TestHandlePodUpdatesSkipsRejectedPods(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + // Create test pods + rejectedPod := newTestPods(1)[0] + evictedPod := newTestPods(1)[0] + evictedPod.UID = "evicted-pod-uid" + evictedPod.Name = "evicted-pod" + now := metav1.NewTime(time.Now()) + evictedPod.DeletionTimestamp = &now + + normalPod := newTestPods(1)[0] + normalPod.UID = "normal-pod-uid" + normalPod.Name = "normal-pod" + + // Setup rejected pod status + kubelet.statusManager.SetPodStatus(klog.TODO(), rejectedPod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfcpu", + Message: PodRejectionMessagePrefix + "insufficient CPU", + }) + + // Setup evicted pod status (has rejection message but also deletion timestamp) + kubelet.statusManager.SetPodStatus(klog.TODO(), evictedPod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "Evicted", + Message: PodRejectionMessagePrefix + "evicted for testing", + }) + + // Add all pods to podManager + kubelet.podManager.AddPod(rejectedPod) + kubelet.podManager.AddPod(evictedPod) + kubelet.podManager.AddPod(normalPod) + + // Call HandlePodUpdates - this should skip rejected pods but process evicted and normal pods + kubelet.HandlePodUpdates([]*v1.Pod{rejectedPod, evictedPod, normalPod}) + + // Wait a short time for processing + time.Sleep(10 * time.Millisecond) + + // Verify by checking if the pods are known to pod workers + // Rejected pods should not be known to pod workers since they were skipped + isRejectedKnown := kubelet.podWorkers.IsPodTerminationRequested(rejectedPod.UID) + assert.False(t, isRejectedKnown, "rejected pod should not be known to pod workers") + + // Note: We can't easily test that evicted and normal pods were processed + // without more complex setup, but the main thing we're testing is that + // rejected pods are correctly skipped because they are not known to pod_workers +} diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index b318ed630f032..a8665bd5b1d4e 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -102,6 +102,12 @@ func (f *fakePodWorkers) IsPodKnownTerminated(uid types.UID) bool { defer f.statusLock.Unlock() return f.terminated[uid] } +func (f *fakePodWorkers) IsPodKnownToWorker(uid types.UID) bool { + f.statusLock.Lock() + defer f.statusLock.Unlock() + // In tests, a pod is known if it's in any of the tracking maps + return f.running[uid] || f.terminating[uid] || f.terminated[uid] +} func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool { f.statusLock.Lock() defer f.statusLock.Unlock() From aae13be22e9e06ddf994be558ff016f3e65d3468 Mon Sep 17 00:00:00 2001 From: Pablo Acevedo Montserrat Date: Mon, 2 Feb 2026 10:10:29 +0100 Subject: [PATCH 3/5] kubelet: Simplify rejected pod update logic Signed-off-by: Pablo Acevedo Montserrat --- pkg/kubelet/kubelet.go | 32 +++++--------------------------- pkg/kubelet/kubelet_pods.go | 28 ++++++++++++++++++++++++++++ pkg/kubelet/pod_workers.go | 14 -------------- pkg/kubelet/pod_workers_test.go | 6 ------ 4 files changed, 33 insertions(+), 47 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ac18579faa364..1a62af4fe6467 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2422,27 +2422,6 @@ func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { Message: PodRejectionMessagePrefix + message}) } -// isLocallyRejected checks if a pod has already been rejected. These pods -// have never had containers created, have a Failed phase and a specific -// status that is forced by rejectPod() call. The only distinctive feature -// they have from other pods is the presence of a specific status: empty -// and having a message with the prefix PodRejectionMessagePrefix. -// This function looks for the message in both statusManager (current -// session rejection) and, if not found, in status from API server (rejection -// that was already synced to the API in case of a kubelet restart). -func (kl *Kubelet) isLocallyRejected(pod *v1.Pod) bool { - localStatus, found := kl.statusManager.GetPodStatus(pod.UID) - if found && strings.HasPrefix(localStatus.Message, PodRejectionMessagePrefix) { - return true - } - - if strings.HasPrefix(pod.Status.Message, PodRejectionMessagePrefix) { - return true - } - - return false -} - func recordAdmissionRejection(reason string) { // It is possible that the "reason" label can have high cardinality. // To avoid this metric from exploding, we create an allowlist of known @@ -2797,15 +2776,14 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { } // Skip rejected pods during UPDATE operations. - // - // Pods always arrive as ADD before UPDATE (guaranteed by the config layer). - // If a pod failed admission in HandlePodAdditions, it was never sent to - // pod_workers and won't be known to the worker. UPDATE operations for such - // pods should be ignored because: + // Pods always arrive as ADD before UPDATE (enforced by the config layer). + // If a pod failed admission in HandlePodAdditions, it was rejected and + // never sent to pod_workers. UPDATE operations for such pods should be + // ignored because: // 1. No containers exist - nothing to sync or clean up // 2. The pod is already in terminal Failed state // 3. REMOVE operation will eventually clean up podManager - if !kl.podWorkers.IsPodKnownToWorker(pod.UID) { + if kl.isLocallyRejected(pod) { continue } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 86eeb0bd26dc1..87cdb9cff1433 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1166,6 +1166,34 @@ func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool { return false } +// isLocallyRejected checks if a pod was rejected by this kubelet instance. +// Rejected pods are pods that failed admission (e.g., insufficient resources, +// node affinity mismatch). These pods: +// - Never had containers created +// - Were marked as Failed with PodRejectionMessagePrefix in the status message +// - Should never enter the pod_workers pipeline +// +// This function checks both the local statusManager cache (for current session +// rejections) and the pod.Status from API server (for rejections that survived +// kubelet restart). +func (kl *Kubelet) isLocallyRejected(pod *v1.Pod) bool { + // Check local statusManager cache first (current session rejections). + // This catches pods rejected since the kubelet started. + localStatus, found := kl.statusManager.GetPodStatus(pod.UID) + if found && strings.HasPrefix(localStatus.Message, PodRejectionMessagePrefix) { + return true + } + + // Check API server status (rejections from before kubelet restart). + // When kubelet restarts, the local statusManager is empty, but the rejection + // status was synced to the API server and is available in pod.Status. + if strings.HasPrefix(pod.Status.Message, PodRejectionMessagePrefix) { + return true + } + + return false +} + // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 6b8a84f224d59..68d47ff5bfaa0 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -183,13 +183,6 @@ type PodWorkers interface { // Intended for use by the kubelet config loops, but not subsystems, which should // use ShouldPod*(). IsPodKnownTerminated(uid types.UID) bool - // IsPodKnownToWorker returns true if the pod has ever been sent to the pod - // worker via UpdatePod (i.e., has an entry in podSyncStatuses). A pod that - // did not enter the pod worker pipeline is a pod that was rejected and - // therefore should not be processed. No containers exist for these pods - // and they are already in a terminal Failed state. - // Intended for use by the kubelet config loops, but not subsystems. - IsPodKnownToWorker(uid types.UID) bool // CouldHaveRunningContainers returns true before the pod workers have synced, // once the pod workers see the pod (syncPod could be called), and returns false // after the pod has been terminated (running containers guaranteed stopped). @@ -656,13 +649,6 @@ func (p *podWorkers) IsPodKnownTerminated(uid types.UID) bool { return false } -func (p *podWorkers) IsPodKnownToWorker(uid types.UID) bool { - p.podLock.Lock() - defer p.podLock.Unlock() - _, ok := p.podSyncStatuses[uid] - return ok -} - func (p *podWorkers) CouldHaveRunningContainers(uid types.UID) bool { p.podLock.Lock() defer p.podLock.Unlock() diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index a8665bd5b1d4e..b318ed630f032 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -102,12 +102,6 @@ func (f *fakePodWorkers) IsPodKnownTerminated(uid types.UID) bool { defer f.statusLock.Unlock() return f.terminated[uid] } -func (f *fakePodWorkers) IsPodKnownToWorker(uid types.UID) bool { - f.statusLock.Lock() - defer f.statusLock.Unlock() - // In tests, a pod is known if it's in any of the tracking maps - return f.running[uid] || f.terminating[uid] || f.terminated[uid] -} func (f *fakePodWorkers) CouldHaveRunningContainers(uid types.UID) bool { f.statusLock.Lock() defer f.statusLock.Unlock() From 6020be998d23fc3f57f8a0f931ac3e1b7c5334b3 Mon Sep 17 00:00:00 2001 From: Pablo Acevedo Montserrat Date: Tue, 3 Feb 2026 17:18:30 +0100 Subject: [PATCH 4/5] kubelet: optimize UPDATE checks Signed-off-by: Pablo Acevedo Montserrat --- pkg/kubelet/kubelet.go | 4 +-- pkg/kubelet/kubelet_test.go | 37 ++++++++++++++++--------- pkg/kubelet/status/status_manager.go | 41 ++++++++++++++++++++++++---- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1a62af4fe6467..6fccc7478f08f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2415,7 +2415,7 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { // and updates the pod to the failed phase in the status manager. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message) - kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + kl.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ QOSClass: v1qos.GetPodQOS(pod), // keep it as is Phase: v1.PodFailed, Reason: reason, @@ -2783,7 +2783,7 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { // 1. No containers exist - nothing to sync or clean up // 2. The pod is already in terminal Failed state // 3. REMOVE operation will eventually clean up podManager - if kl.isLocallyRejected(pod) { + if kl.statusManager.IsPodRejected(pod.UID) { continue } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 6861d7dbf2c16..9cad8b18f06ee 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4605,7 +4605,17 @@ func TestIsLocallyRejected(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet - pod := newTestPods(1)[0] + + // Helper to create a unique pod for each test case. Each of the TCs needs a + // different pod UID. + podCounter := 0 + newUniquePod := func() *v1.Pod { + podCounter++ + pod := newTestPods(1)[0] + pod.UID = types.UID(fmt.Sprintf("test-pod-uid-%d", podCounter)) + pod.Name = fmt.Sprintf("test-pod-%d", podCounter) + return pod + } testCases := []struct { name string @@ -4617,7 +4627,7 @@ func TestIsLocallyRejected(t *testing.T) { { name: "no local status", setupPod: func() *v1.Pod { - return pod + return newUniquePod() }, setupLocalStatus: func(pod *v1.Pod) {}, expectedRejected: false, @@ -4626,7 +4636,7 @@ func TestIsLocallyRejected(t *testing.T) { { name: "local status without rejection message", setupPod: func() *v1.Pod { - return pod + return newUniquePod() }, setupLocalStatus: func(pod *v1.Pod) { kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ @@ -4641,10 +4651,10 @@ func TestIsLocallyRejected(t *testing.T) { { name: "rejected pod without deletion timestamp", setupPod: func() *v1.Pod { - return pod + return newUniquePod() }, setupLocalStatus: func(pod *v1.Pod) { - kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + kubelet.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ Phase: v1.PodFailed, Reason: "OutOfcpu", Message: PodRejectionMessagePrefix + "insufficient CPU", @@ -4656,13 +4666,13 @@ func TestIsLocallyRejected(t *testing.T) { { name: "pod with deletion timestamp and rejection message", setupPod: func() *v1.Pod { - podCopy := pod + pod := newUniquePod() now := metav1.NewTime(time.Now()) - podCopy.DeletionTimestamp = &now - return podCopy + pod.DeletionTimestamp = &now + return pod }, setupLocalStatus: func(pod *v1.Pod) { - kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + kubelet.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ Phase: v1.PodFailed, Reason: "OutOfcpu", Message: PodRejectionMessagePrefix + "insufficient CPU", @@ -4674,7 +4684,7 @@ func TestIsLocallyRejected(t *testing.T) { { name: "pod with partial rejection message prefix", setupPod: func() *v1.Pod { - return pod + return newUniquePod() }, setupLocalStatus: func(pod *v1.Pod) { kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ @@ -4689,7 +4699,7 @@ func TestIsLocallyRejected(t *testing.T) { { name: "pod with rejection message in the middle", setupPod: func() *v1.Pod { - return pod + return newUniquePod() }, setupLocalStatus: func(pod *v1.Pod) { kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ @@ -4716,6 +4726,7 @@ func TestIsLocallyRejected(t *testing.T) { func TestRejectPodUsesConstant(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() kubelet := testKubelet.kubelet @@ -4753,14 +4764,14 @@ func TestHandlePodUpdatesSkipsRejectedPods(t *testing.T) { normalPod.Name = "normal-pod" // Setup rejected pod status - kubelet.statusManager.SetPodStatus(klog.TODO(), rejectedPod, v1.PodStatus{ + kubelet.statusManager.SetPodRejected(klog.TODO(), rejectedPod, v1.PodStatus{ Phase: v1.PodFailed, Reason: "OutOfcpu", Message: PodRejectionMessagePrefix + "insufficient CPU", }) // Setup evicted pod status (has rejection message but also deletion timestamp) - kubelet.statusManager.SetPodStatus(klog.TODO(), evictedPod, v1.PodStatus{ + kubelet.statusManager.SetPodRejected(klog.TODO(), evictedPod, v1.PodStatus{ Phase: v1.PodFailed, Reason: "Evicted", Message: PodRejectionMessagePrefix + "evicted for testing", diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 236e06576e7e6..04abc44b0a575 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -59,6 +59,9 @@ type versionedPodStatus struct { // True if the status is generated at the end of SyncTerminatedPod, or after it is completed. podIsFinished bool + // True if the pod was rejected during admission. + podIsRejected bool + status v1.PodStatus } @@ -179,6 +182,12 @@ type Manager interface { // BackfillPodResizeConditions backfills the status manager's resize conditions by reading them from the // provided pods' statuses. BackfillPodResizeConditions(pods []*v1.Pod) + + // SetPodRejected caches the status for a pod and also marks it as rejected. + SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) + + // IsPodRejected returns true if the pod was rejected during admission. + IsPodRejected(uid types.UID) bool } const syncPeriod = 10 * time.Second @@ -407,7 +416,17 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + m.setPodStatusLocked(logger, pod, status, false) +} + +func (m *manager) SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + m.setPodStatusLocked(logger, pod, status, true) +} +// setPodStatusLocked updates the pod status cache. The caller must hold podStatusesLock. +func (m *manager) setPodStatusLocked(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, podIsRejected bool) { // Make sure we're caching a deep copy. status = *status.DeepCopy() @@ -417,7 +436,14 @@ func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodSta // Force a status update if deletion timestamp is set. This is necessary // because if the pod is in the non-running state, the pod worker still // needs to be able to trigger an update and/or deletion. - m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false) + m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false, podIsRejected) +} + +func (m *manager) IsPodRejected(uid types.UID) bool { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))] + return ok && status.podIsRejected } func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -480,7 +506,7 @@ func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, co allContainerStatuses := append(status.InitContainerStatuses, status.ContainerStatuses...) updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(pod, &oldStatus.status, status.Conditions, allContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(pod, &oldStatus.status, allContainerStatuses, status.Phase)) - m.updateStatusInternal(logger, pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false, false) } func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) { @@ -522,7 +548,7 @@ func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, cont containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Started = &started - m.updateStatusInternal(logger, pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false, false) } func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { @@ -616,7 +642,7 @@ func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) { } logger.V(5).Info("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) - m.updateStatusInternal(logger, pod, status, true, true) + m.updateStatusInternal(logger, pod, status, true, true, false) } // hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which @@ -752,7 +778,7 @@ func checkContainerStateTransition(oldStatuses, newStatuses *v1.PodStatus, podSp // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) { +func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished, podIsRejected bool) { var oldStatus v1.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -763,6 +789,10 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v logger.Info("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error", "pod", klog.KObj(pod)) podIsFinished = true } + if cachedStatus.podIsRejected && !podIsRejected { + logger.Info("Got unexpected podIsRejected=false, while podIsRejected=true in status cache, programmer error", "pod", klog.KObj(pod)) + podIsRejected = true + } } } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status @@ -855,6 +885,7 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v podName: pod.Name, podNamespace: pod.Namespace, podIsFinished: podIsFinished, + podIsRejected: podIsRejected, } // Multiple status updates can be generated before we update the API server, From 54fe4fa7864b94ea806709874d70af1ddb4a5ae1 Mon Sep 17 00:00:00 2001 From: Pablo Acevedo Montserrat Date: Wed, 4 Feb 2026 09:17:52 +0100 Subject: [PATCH 5/5] kubelet: Optimize rejected pod detection Signed-off-by: Pablo Acevedo Montserrat --- pkg/kubelet/kubelet.go | 11 +++---- pkg/kubelet/kubelet_pods.go | 28 ----------------- pkg/kubelet/kubelet_test.go | 63 +++++-------------------------------- 3 files changed, 12 insertions(+), 90 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 6fccc7478f08f..68f63a20531e8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2716,12 +2716,11 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } - // Skip previously rejected pods on kubelet restart. - // When kubelet restarts, all pods arrive as ADD operations. Pods that were - // previously rejected have Failed phase and skip the admission check above - // (due to IsPodPhaseTerminal check). Without this filter, they would enter - // pod_workers. - if kl.isLocallyRejected(pod) { + // Skip pods rejected during this kubelet session. + // After restart, previously rejected pods may enter pod_workers, but they + // will be immediately marked as terminated because their pod cache is empty + // (no containers were ever created), so they won't affect resource accounting. + if kl.statusManager.IsPodRejected(pod.UID) { continue } diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 87cdb9cff1433..86eeb0bd26dc1 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1166,34 +1166,6 @@ func (kl *Kubelet) isAdmittedPodTerminal(pod *v1.Pod) bool { return false } -// isLocallyRejected checks if a pod was rejected by this kubelet instance. -// Rejected pods are pods that failed admission (e.g., insufficient resources, -// node affinity mismatch). These pods: -// - Never had containers created -// - Were marked as Failed with PodRejectionMessagePrefix in the status message -// - Should never enter the pod_workers pipeline -// -// This function checks both the local statusManager cache (for current session -// rejections) and the pod.Status from API server (for rejections that survived -// kubelet restart). -func (kl *Kubelet) isLocallyRejected(pod *v1.Pod) bool { - // Check local statusManager cache first (current session rejections). - // This catches pods rejected since the kubelet started. - localStatus, found := kl.statusManager.GetPodStatus(pod.UID) - if found && strings.HasPrefix(localStatus.Message, PodRejectionMessagePrefix) { - return true - } - - // Check API server status (rejections from before kubelet restart). - // When kubelet restarts, the local statusManager is empty, but the rejection - // status was synced to the API server and is available in pod.Status. - if strings.HasPrefix(pod.Status.Message, PodRejectionMessagePrefix) { - return true - } - - return false -} - // removeOrphanedPodStatuses removes obsolete entries in podStatus where // the pod is no longer considered bound to this node. func (kl *Kubelet) removeOrphanedPodStatuses(pods []*v1.Pod, mirrorPods []*v1.Pod) { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 9cad8b18f06ee..e87c5e4091633 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4601,13 +4601,12 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { } } -func TestIsLocallyRejected(t *testing.T) { +func TestIsPodRejected(t *testing.T) { testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) defer testKubelet.Cleanup() kubelet := testKubelet.kubelet - // Helper to create a unique pod for each test case. Each of the TCs needs a - // different pod UID. + // Helper to create a unique pod for each test case. podCounter := 0 newUniquePod := func() *v1.Pod { podCounter++ @@ -4634,7 +4633,7 @@ func TestIsLocallyRejected(t *testing.T) { description: "pod with no local status should not be considered rejected", }, { - name: "local status without rejection message", + name: "local status set via SetPodStatus", setupPod: func() *v1.Pod { return newUniquePod() }, @@ -4646,10 +4645,10 @@ func TestIsLocallyRejected(t *testing.T) { }) }, expectedRejected: false, - description: "pod with Failed status but no rejection message should not be considered rejected", + description: "pod with status set via SetPodStatus should not be considered rejected", }, { - name: "rejected pod without deletion timestamp", + name: "rejected pod via SetPodRejected", setupPod: func() *v1.Pod { return newUniquePod() }, @@ -4661,55 +4660,7 @@ func TestIsLocallyRejected(t *testing.T) { }) }, expectedRejected: true, - description: "rejected pod without deletion timestamp should be considered rejected", - }, - { - name: "pod with deletion timestamp and rejection message", - setupPod: func() *v1.Pod { - pod := newUniquePod() - now := metav1.NewTime(time.Now()) - pod.DeletionTimestamp = &now - return pod - }, - setupLocalStatus: func(pod *v1.Pod) { - kubelet.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ - Phase: v1.PodFailed, - Reason: "OutOfcpu", - Message: PodRejectionMessagePrefix + "insufficient CPU", - }) - }, - expectedRejected: true, - description: "pod with rejection message is considered rejected regardless of deletion timestamp", - }, - { - name: "pod with partial rejection message prefix", - setupPod: func() *v1.Pod { - return newUniquePod() - }, - setupLocalStatus: func(pod *v1.Pod) { - kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ - Phase: v1.PodFailed, - Reason: "OutOfMemory", - Message: "Pod was reject", - }) - }, - expectedRejected: false, - description: "pod with partial rejection message prefix should not be considered rejected", - }, - { - name: "pod with rejection message in the middle", - setupPod: func() *v1.Pod { - return newUniquePod() - }, - setupLocalStatus: func(pod *v1.Pod) { - kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ - Phase: v1.PodFailed, - Reason: "OutOfMemory", - Message: "Container failed because " + PodRejectionMessagePrefix + "insufficient resources", - }) - }, - expectedRejected: false, - description: "pod with rejection message not at start should not be considered rejected", + description: "pod rejected via SetPodRejected should be considered rejected", }, } @@ -4718,7 +4669,7 @@ func TestIsLocallyRejected(t *testing.T) { pod := tc.setupPod() tc.setupLocalStatus(pod) - result := kubelet.isLocallyRejected(pod) + result := kubelet.statusManager.IsPodRejected(pod.UID) assert.Equal(t, tc.expectedRejected, result, tc.description) }) }