diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e72cf3d9b203b..68f63a20531e8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -229,6 +229,10 @@ const ( // instrumentationScope is the name of OpenTelemetry instrumentation scope instrumentationScope = "k8s.io/kubernetes/pkg/kubelet" + + // PodRejectionMessagePrefix is the prefix used in status messages to identify a + // pod that was rejected by the kubelet and should never enter the pod worker pipeline. + PodRejectionMessagePrefix = "Pod was rejected: " ) var ( @@ -2411,11 +2415,11 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { // and updates the pod to the failed phase in the status manager. func (kl *Kubelet) rejectPod(pod *v1.Pod, reason, message string) { kl.recorder.Eventf(pod, v1.EventTypeWarning, reason, message) - kl.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + kl.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ QOSClass: v1qos.GetPodQOS(pod), // keep it as is Phase: v1.PodFailed, Reason: reason, - Message: "Pod was rejected: " + message}) + Message: PodRejectionMessagePrefix + message}) } func recordAdmissionRejection(reason string) { @@ -2711,6 +2715,15 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { } } } + + // Skip pods rejected during this kubelet session. + // After restart, previously rejected pods may enter pod_workers, but they + // will be immediately marked as terminated because their pod cache is empty + // (no containers were ever created), so they won't affect resource accounting. + if kl.statusManager.IsPodRejected(pod.UID) { + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, @@ -2761,6 +2774,18 @@ func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) { } } + // Skip rejected pods during UPDATE operations. + // Pods always arrive as ADD before UPDATE (enforced by the config layer). + // If a pod failed admission in HandlePodAdditions, it was rejected and + // never sent to pod_workers. UPDATE operations for such pods should be + // ignored because: + // 1. No containers exist - nothing to sync or clean up + // 2. The pod is already in terminal Failed state + // 3. REMOVE operation will eventually clean up podManager + if kl.statusManager.IsPodRejected(pod.UID) { + continue + } + kl.podWorkers.UpdatePod(UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7746458001108..e87c5e4091633 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1760,7 +1760,12 @@ func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) { } // Let the pod worker sets the status to fail after this sync. - kubelet.HandlePodUpdates(pods) + // Use HandlePodSyncs to test active deadline behavior. + // This bypasses admission and sends pods directly to the pod worker. + // HandlePodUpdates requires the pod to already be known to the worker, + // and HandlePodAdditions would reject the pod due to test node capacity. + kubelet.podManager.SetPods(pods) + kubelet.HandlePodSyncs(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.Equal(t, v1.PodFailed, status.Phase) @@ -1809,8 +1814,11 @@ func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) { }}, } + // Use HandlePodSyncs to test active deadline behavior. + // This bypasses admission and sends pods directly to the pod worker. + // HandlePodUpdates requires the pod to already be known to the worker. kubelet.podManager.SetPods(pods) - kubelet.HandlePodUpdates(pods) + kubelet.HandlePodSyncs(pods) status, found := kubelet.statusManager.GetPodStatus(pods[0].UID) assert.True(t, found, "expected to found status for pod %q", pods[0].UID) assert.NotEqual(t, v1.PodFailed, status.Phase) @@ -4592,3 +4600,151 @@ func TestHandlePodReconcile_RetryPendingResizes(t *testing.T) { }) } } + +func TestIsPodRejected(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + // Helper to create a unique pod for each test case. + podCounter := 0 + newUniquePod := func() *v1.Pod { + podCounter++ + pod := newTestPods(1)[0] + pod.UID = types.UID(fmt.Sprintf("test-pod-uid-%d", podCounter)) + pod.Name = fmt.Sprintf("test-pod-%d", podCounter) + return pod + } + + testCases := []struct { + name string + setupPod func() *v1.Pod + setupLocalStatus func(*v1.Pod) + expectedRejected bool + description string + }{ + { + name: "no local status", + setupPod: func() *v1.Pod { + return newUniquePod() + }, + setupLocalStatus: func(pod *v1.Pod) {}, + expectedRejected: false, + description: "pod with no local status should not be considered rejected", + }, + { + name: "local status set via SetPodStatus", + setupPod: func() *v1.Pod { + return newUniquePod() + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodStatus(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "ImagePullBackOff", + Message: "Failed to pull image", + }) + }, + expectedRejected: false, + description: "pod with status set via SetPodStatus should not be considered rejected", + }, + { + name: "rejected pod via SetPodRejected", + setupPod: func() *v1.Pod { + return newUniquePod() + }, + setupLocalStatus: func(pod *v1.Pod) { + kubelet.statusManager.SetPodRejected(klog.TODO(), pod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfcpu", + Message: PodRejectionMessagePrefix + "insufficient CPU", + }) + }, + expectedRejected: true, + description: "pod rejected via SetPodRejected should be considered rejected", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pod := tc.setupPod() + tc.setupLocalStatus(pod) + + result := kubelet.statusManager.IsPodRejected(pod.UID) + assert.Equal(t, tc.expectedRejected, result, tc.description) + }) + } +} + +func TestRejectPodUsesConstant(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + pod := newTestPods(1)[0] + reason := "OutOfcpu" + message := "insufficient CPU resources" + + kubelet.rejectPod(pod, reason, message) + + status, found := kubelet.statusManager.GetPodStatus(pod.UID) + assert.True(t, found, "expected to find status for rejected pod") + assert.Equal(t, v1.PodFailed, status.Phase) + assert.Equal(t, reason, status.Reason) + + expectedMessage := PodRejectionMessagePrefix + message + assert.Equal(t, expectedMessage, status.Message, "rejectPod should use PodRejectionMessagePrefix constant") + assert.True(t, strings.HasPrefix(status.Message, PodRejectionMessagePrefix), "rejection message should start with prefix") +} + +func TestHandlePodUpdatesSkipsRejectedPods(t *testing.T) { + testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) + defer testKubelet.Cleanup() + kubelet := testKubelet.kubelet + + // Create test pods + rejectedPod := newTestPods(1)[0] + evictedPod := newTestPods(1)[0] + evictedPod.UID = "evicted-pod-uid" + evictedPod.Name = "evicted-pod" + now := metav1.NewTime(time.Now()) + evictedPod.DeletionTimestamp = &now + + normalPod := newTestPods(1)[0] + normalPod.UID = "normal-pod-uid" + normalPod.Name = "normal-pod" + + // Setup rejected pod status + kubelet.statusManager.SetPodRejected(klog.TODO(), rejectedPod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "OutOfcpu", + Message: PodRejectionMessagePrefix + "insufficient CPU", + }) + + // Setup evicted pod status (has rejection message but also deletion timestamp) + kubelet.statusManager.SetPodRejected(klog.TODO(), evictedPod, v1.PodStatus{ + Phase: v1.PodFailed, + Reason: "Evicted", + Message: PodRejectionMessagePrefix + "evicted for testing", + }) + + // Add all pods to podManager + kubelet.podManager.AddPod(rejectedPod) + kubelet.podManager.AddPod(evictedPod) + kubelet.podManager.AddPod(normalPod) + + // Call HandlePodUpdates - this should skip rejected pods but process evicted and normal pods + kubelet.HandlePodUpdates([]*v1.Pod{rejectedPod, evictedPod, normalPod}) + + // Wait a short time for processing + time.Sleep(10 * time.Millisecond) + + // Verify by checking if the pods are known to pod workers + // Rejected pods should not be known to pod workers since they were skipped + isRejectedKnown := kubelet.podWorkers.IsPodTerminationRequested(rejectedPod.UID) + assert.False(t, isRejectedKnown, "rejected pod should not be known to pod workers") + + // Note: We can't easily test that evicted and normal pods were processed + // without more complex setup, but the main thing we're testing is that + // rejected pods are correctly skipped because they are not known to pod_workers +} diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 236e06576e7e6..04abc44b0a575 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -59,6 +59,9 @@ type versionedPodStatus struct { // True if the status is generated at the end of SyncTerminatedPod, or after it is completed. podIsFinished bool + // True if the pod was rejected during admission. + podIsRejected bool + status v1.PodStatus } @@ -179,6 +182,12 @@ type Manager interface { // BackfillPodResizeConditions backfills the status manager's resize conditions by reading them from the // provided pods' statuses. BackfillPodResizeConditions(pods []*v1.Pod) + + // SetPodRejected caches the status for a pod and also marks it as rejected. + SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) + + // IsPodRejected returns true if the pod was rejected during admission. + IsPodRejected(uid types.UID) bool } const syncPeriod = 10 * time.Second @@ -407,7 +416,17 @@ func (m *manager) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() + m.setPodStatusLocked(logger, pod, status, false) +} + +func (m *manager) SetPodRejected(logger klog.Logger, pod *v1.Pod, status v1.PodStatus) { + m.podStatusesLock.Lock() + defer m.podStatusesLock.Unlock() + m.setPodStatusLocked(logger, pod, status, true) +} +// setPodStatusLocked updates the pod status cache. The caller must hold podStatusesLock. +func (m *manager) setPodStatusLocked(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, podIsRejected bool) { // Make sure we're caching a deep copy. status = *status.DeepCopy() @@ -417,7 +436,14 @@ func (m *manager) SetPodStatus(logger klog.Logger, pod *v1.Pod, status v1.PodSta // Force a status update if deletion timestamp is set. This is necessary // because if the pod is in the non-running state, the pod worker still // needs to be able to trigger an update and/or deletion. - m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false) + m.updateStatusInternal(logger, pod, status, pod.DeletionTimestamp != nil, false, podIsRejected) +} + +func (m *manager) IsPodRejected(uid types.UID) bool { + m.podStatusesLock.RLock() + defer m.podStatusesLock.RUnlock() + status, ok := m.podStatuses[types.UID(m.podManager.TranslatePodUID(uid))] + return ok && status.podIsRejected } func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, ready bool) { @@ -480,7 +506,7 @@ func (m *manager) SetContainerReadiness(logger klog.Logger, podUID types.UID, co allContainerStatuses := append(status.InitContainerStatuses, status.ContainerStatuses...) updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(pod, &oldStatus.status, status.Conditions, allContainerStatuses, status.Phase)) updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(pod, &oldStatus.status, allContainerStatuses, status.Phase)) - m.updateStatusInternal(logger, pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false, false) } func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, containerID kubecontainer.ContainerID, started bool) { @@ -522,7 +548,7 @@ func (m *manager) SetContainerStartup(logger klog.Logger, podUID types.UID, cont containerStatus, _, _ = findContainerStatus(&status, containerID.String()) containerStatus.Started = &started - m.updateStatusInternal(logger, pod, status, false, false) + m.updateStatusInternal(logger, pod, status, false, false, false) } func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) { @@ -616,7 +642,7 @@ func (m *manager) TerminatePod(logger klog.Logger, pod *v1.Pod) { } logger.V(5).Info("TerminatePod calling updateStatusInternal", "pod", klog.KObj(pod), "podUID", pod.UID) - m.updateStatusInternal(logger, pod, status, true, true) + m.updateStatusInternal(logger, pod, status, true, true, false) } // hasPodInitialized returns true if the pod has no evidence of ever starting a regular container, which @@ -752,7 +778,7 @@ func checkContainerStateTransition(oldStatuses, newStatuses *v1.PodStatus, podSp // updateStatusInternal updates the internal status cache, and queues an update to the api server if // necessary. // This method IS NOT THREAD SAFE and must be called from a locked function. -func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished bool) { +func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v1.PodStatus, forceUpdate, podIsFinished, podIsRejected bool) { var oldStatus v1.PodStatus cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { @@ -763,6 +789,10 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v logger.Info("Got unexpected podIsFinished=false, while podIsFinished=true in status cache, programmer error", "pod", klog.KObj(pod)) podIsFinished = true } + if cachedStatus.podIsRejected && !podIsRejected { + logger.Info("Got unexpected podIsRejected=false, while podIsRejected=true in status cache, programmer error", "pod", klog.KObj(pod)) + podIsRejected = true + } } } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status @@ -855,6 +885,7 @@ func (m *manager) updateStatusInternal(logger klog.Logger, pod *v1.Pod, status v podName: pod.Name, podNamespace: pod.Namespace, podIsFinished: podIsFinished, + podIsRejected: podIsRejected, } // Multiple status updates can be generated before we update the API server,