From 4e36c8d0932e78b75ecea71d53e8464cdf6ee48c Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Tue, 3 Feb 2026 13:58:26 +0200 Subject: [PATCH 1/3] ssa: handle Jobs with TTL during health check Signed-off-by: Stefan Prodan --- ssa/manager_apply.go | 4 +- ssa/manager_delete_test.go | 2 +- ssa/manager_wait.go | 12 +++ ssa/manager_wait_test.go | 2 +- ssa/utils/job.go | 47 ++++++++++ ssa/utils/job_test.go | 173 +++++++++++++++++++++++++++++++++++++ 6 files changed, 236 insertions(+), 4 deletions(-) create mode 100644 ssa/utils/job.go create mode 100644 ssa/utils/job_test.go diff --git a/ssa/manager_apply.go b/ssa/manager_apply.go index bf7169477..76e5a5fb7 100644 --- a/ssa/manager_apply.go +++ b/ssa/manager_apply.go @@ -291,7 +291,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc } changeSet.Append(cs.Entries) - if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{opts.WaitInterval, opts.WaitTimeout, false}); err != nil { + if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{Interval: opts.WaitInterval, Timeout: opts.WaitTimeout}); err != nil { return changeSet, err } } @@ -304,7 +304,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc } changeSet.Append(cs.Entries) - if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{opts.WaitInterval, opts.WaitTimeout, false}); err != nil { + if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{Interval: opts.WaitInterval, Timeout: opts.WaitTimeout}); err != nil { return changeSet, err } } diff --git a/ssa/manager_delete_test.go b/ssa/manager_delete_test.go index 94be72ce6..f1cd60d39 100644 --- a/ssa/manager_delete_test.go +++ b/ssa/manager_delete_test.go @@ -98,7 +98,7 @@ func TestDelete(t *testing.T) { t.Error(err) } - if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second, false}); err != nil { + if err := manager.WaitForTermination(objects, WaitOptions{Interval: time.Second, Timeout: 5 * time.Second}); err != nil { // workaround for https://github.com/kubernetes-sigs/controller-runtime/issues/880 if !strings.Contains(err.Error(), "Namespace/") { t.Error(err) diff --git a/ssa/manager_wait.go b/ssa/manager_wait.go index a2f34d5db..98a9d1e55 100644 --- a/ssa/manager_wait.go +++ b/ssa/manager_wait.go @@ -51,6 +51,11 @@ type WaitOptions struct { // FailFast makes the Wait function return an error as soon as a resource reaches the failed state. FailFast bool + + // JobsWithTTL is a set of Job identifiers that have spec.ttlSecondsAfterFinished set. + // NotFound status for these Jobs is treated as successful completion since they + // are deleted by the TTL controller after finishing. + JobsWithTTL object.ObjMetadataSet } // DefaultWaitOptions returns the default wait options where the poll interval is set to @@ -149,7 +154,14 @@ func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object. for id, rs := range statusCollector.ResourceStatuses { switch { case rs == nil || lastStatus[id] == nil: + // Skip Jobs with TTL that are deleted after completion + if opts.JobsWithTTL.Contains(id) { + continue + } errs = append(errs, fmt.Sprintf("can't determine status for %s", utils.FmtObjMetadata(id))) + case lastStatus[id].Status == status.NotFoundStatus && opts.JobsWithTTL.Contains(id): + // Job with TTL was deleted after completion, treat as successful completion + continue case lastStatus[id].Status == status.FailedStatus, errors.Is(ctx.Err(), context.DeadlineExceeded) && lastStatus[id].Status != status.CurrentStatus: diff --git a/ssa/manager_wait_test.go b/ssa/manager_wait_test.go index 448ff36f7..1f13d1b8f 100644 --- a/ssa/manager_wait_test.go +++ b/ssa/manager_wait_test.go @@ -72,7 +72,7 @@ func TestWaitForSet(t *testing.T) { t.Fatal(err) } - if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second, false}); err == nil { + if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{Interval: time.Second, Timeout: 3 * time.Second}); err == nil { t.Error("wanted wait error due to observedGeneration < generation") } diff --git a/ssa/utils/job.go b/ssa/utils/job.go new file mode 100644 index 000000000..57903d2a9 --- /dev/null +++ b/ssa/utils/job.go @@ -0,0 +1,47 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "strings" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + "github.com/fluxcd/cli-utils/pkg/object" +) + +// ExtractJobsWithTTL returns the ObjMetadataSet of Jobs that have spec.ttlSecondsAfterFinished set. +func ExtractJobsWithTTL(objects []*unstructured.Unstructured) object.ObjMetadataSet { + var result object.ObjMetadataSet + for _, obj := range objects { + if !IsJob(obj) { + continue + } + // Check if ttlSecondsAfterFinished is set (any value including 0) + _, found, err := unstructured.NestedInt64(obj.Object, "spec", "ttlSecondsAfterFinished") + if err == nil && found { + result = append(result, object.UnstructuredToObjMetadata(obj)) + } + } + return result +} + +// IsJob returns true if the object is a Kubernetes Job. +func IsJob(obj *unstructured.Unstructured) bool { + return obj.GetKind() == "Job" && + strings.HasPrefix(obj.GetAPIVersion(), "batch/") +} diff --git a/ssa/utils/job_test.go b/ssa/utils/job_test.go new file mode 100644 index 000000000..84c71f484 --- /dev/null +++ b/ssa/utils/job_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2026 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestIsJob(t *testing.T) { + tests := []struct { + name string + apiVersion string + kind string + want bool + }{ + { + name: "batch/v1 Job", + apiVersion: "batch/v1", + kind: "Job", + want: true, + }, + { + name: "batch/v1beta1 Job", + apiVersion: "batch/v1beta1", + kind: "Job", + want: true, + }, + { + name: "apps/v1 Deployment", + apiVersion: "apps/v1", + kind: "Deployment", + want: false, + }, + { + name: "v1 ConfigMap", + apiVersion: "v1", + kind: "ConfigMap", + want: false, + }, + { + name: "batch/v1 CronJob", + apiVersion: "batch/v1", + kind: "CronJob", + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion(tt.apiVersion) + obj.SetKind(tt.kind) + + if got := IsJob(obj); got != tt.want { + t.Errorf("IsJob() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestExtractJobsWithTTL(t *testing.T) { + tests := []struct { + name string + objects []*unstructured.Unstructured + want int + }{ + { + name: "empty list", + objects: []*unstructured.Unstructured{}, + want: 0, + }, + { + name: "Job with ttlSecondsAfterFinished: 0", + objects: []*unstructured.Unstructured{ + makeJob("test-job", "default", int64Ptr(0)), + }, + want: 1, + }, + { + name: "Job with ttlSecondsAfterFinished: 300", + objects: []*unstructured.Unstructured{ + makeJob("test-job", "default", int64Ptr(300)), + }, + want: 1, + }, + { + name: "Job without ttlSecondsAfterFinished", + objects: []*unstructured.Unstructured{ + makeJob("test-job", "default", nil), + }, + want: 0, + }, + { + name: "non-Job resources", + objects: []*unstructured.Unstructured{ + makeDeployment("test-deploy", "default"), + makeConfigMap("test-cm", "default"), + }, + want: 0, + }, + { + name: "mixed list of resources", + objects: []*unstructured.Unstructured{ + makeJob("job-with-ttl", "default", int64Ptr(60)), + makeJob("job-without-ttl", "default", nil), + makeDeployment("test-deploy", "default"), + makeJob("another-job-with-ttl", "other", int64Ptr(0)), + }, + want: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ExtractJobsWithTTL(tt.objects) + if len(result) != tt.want { + t.Errorf("ExtractJobsWithTTL() returned %d items, want %d", len(result), tt.want) + } + }) + } +} + +func int64Ptr(v int64) *int64 { + return &v +} + +func makeJob(name, namespace string, ttlSecondsAfterFinished *int64) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("batch/v1") + obj.SetKind("Job") + obj.SetName(name) + obj.SetNamespace(namespace) + + if ttlSecondsAfterFinished != nil { + _ = unstructured.SetNestedField(obj.Object, *ttlSecondsAfterFinished, "spec", "ttlSecondsAfterFinished") + } + + return obj +} + +func makeDeployment(name, namespace string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("apps/v1") + obj.SetKind("Deployment") + obj.SetName(name) + obj.SetNamespace(namespace) + return obj +} + +func makeConfigMap(name, namespace string) *unstructured.Unstructured { + obj := &unstructured.Unstructured{} + obj.SetAPIVersion("v1") + obj.SetKind("ConfigMap") + obj.SetName(name) + obj.SetNamespace(namespace) + return obj +} From 5a6950833b7b4df03d8880ec20589e654c698e9d Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Tue, 3 Feb 2026 14:27:33 +0200 Subject: [PATCH 2/3] ssa: add Jobs with TTL wait test Signed-off-by: Stefan Prodan --- ssa/manager_wait_test.go | 79 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/ssa/manager_wait_test.go b/ssa/manager_wait_test.go index 1f13d1b8f..8c9892101 100644 --- a/ssa/manager_wait_test.go +++ b/ssa/manager_wait_test.go @@ -507,6 +507,85 @@ func TestWaitWithContext_Cancellation(t *testing.T) { } } +func TestWaitForSet_JobWithTTL(t *testing.T) { + g := NewWithT(t) + + id := generateName("job-ttl") + + // Create a Job with ttlSecondsAfterFinished set + job := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": map[string]any{ + "name": id, + "namespace": "default", + }, + "spec": map[string]any{ + "ttlSecondsAfterFinished": int64(0), + "template": map[string]any{ + "spec": map[string]any{ + "restartPolicy": "Never", + "containers": []any{ + map[string]any{ + "name": "test", + "image": "busybox", + "command": []any{"echo", "hello"}, + }, + }, + }, + }, + }, + }, + } + + // Apply the Job + _, err := manager.ApplyAll(context.Background(), []*unstructured.Unstructured{job}, DefaultApplyOptions()) + g.Expect(err).NotTo(HaveOccurred()) + + jobObjMeta := object.UnstructuredToObjMetadata(job) + + // Use a custom status reader that returns NotFoundStatus for the Job + // to simulate the TTL controller deleting it after completion + manager.poller = polling.NewStatusPoller(manager.client, restMapper, polling.Options{ + CustomStatusReaders: []engine.StatusReader{ + kstatusreaders.NewGenericStatusReader(restMapper, + func(u *unstructured.Unstructured) (*status.Result, error) { + if u.GetKind() == "Job" && u.GetName() == id { + return &status.Result{ + Status: status.NotFoundStatus, + Message: "Resource not found", + }, nil + } + return status.Compute(u) + }, + ), + }, + }) + defer func() { + manager.poller = poller + }() + + t.Run("NotFound Job with TTL is treated as success", func(t *testing.T) { + err = manager.WaitForSet([]object.ObjMetadata{jobObjMeta}, WaitOptions{ + Interval: 100 * time.Millisecond, + Timeout: 2 * time.Second, + JobsWithTTL: object.ObjMetadataSet{jobObjMeta}, + }) + g.Expect(err).NotTo(HaveOccurred(), "NotFound status for Job with TTL should be treated as success") + }) + + t.Run("NotFound Job without TTL option is treated as error", func(t *testing.T) { + err = manager.WaitForSet([]object.ObjMetadata{jobObjMeta}, WaitOptions{ + Interval: 100 * time.Millisecond, + Timeout: 2 * time.Second, + // JobsWithTTL not set + }) + g.Expect(err).To(HaveOccurred(), "NotFound status for Job without TTL option should be an error") + g.Expect(err.Error()).To(ContainSubstring("NotFound")) + }) +} + func TestWaitForSetTermination(t *testing.T) { timeout := 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) From 56ded96ec8f8c6e29c3a8cef33b0ba2377d1d259 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Tue, 3 Feb 2026 14:36:51 +0200 Subject: [PATCH 3/3] ssa: treats NotFound Jobs with TTL as Current Signed-off-by: Stefan Prodan --- ssa/manager_wait.go | 8 +++++++- ssa/manager_wait_test.go | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/ssa/manager_wait.go b/ssa/manager_wait.go index 98a9d1e55..4674525e4 100644 --- a/ssa/manager_wait.go +++ b/ssa/manager_wait.go @@ -108,8 +108,14 @@ func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object. } lastStatus[rs.Identifier] = rs + // Treat NotFound Jobs with TTL as Current for aggregation purposes + effectiveStatus := rs.Status + if rs.Status == status.NotFoundStatus && opts.JobsWithTTL.Contains(rs.Identifier) { + effectiveStatus = status.CurrentStatus + } + rss = append(rss, rs) - counts[rs.Status]++ + counts[effectiveStatus]++ } // If only Failed or Current statuses are present, diff --git a/ssa/manager_wait_test.go b/ssa/manager_wait_test.go index 8c9892101..cba11f1ef 100644 --- a/ssa/manager_wait_test.go +++ b/ssa/manager_wait_test.go @@ -567,12 +567,16 @@ func TestWaitForSet_JobWithTTL(t *testing.T) { }() t.Run("NotFound Job with TTL is treated as success", func(t *testing.T) { + start := time.Now() err = manager.WaitForSet([]object.ObjMetadata{jobObjMeta}, WaitOptions{ Interval: 100 * time.Millisecond, - Timeout: 2 * time.Second, + Timeout: 5 * time.Second, JobsWithTTL: object.ObjMetadataSet{jobObjMeta}, }) + elapsed := time.Since(start) + g.Expect(err).NotTo(HaveOccurred(), "NotFound status for Job with TTL should be treated as success") + g.Expect(elapsed).To(BeNumerically("<", 2*time.Second), "should return early, not wait for full timeout") }) t.Run("NotFound Job without TTL option is treated as error", func(t *testing.T) {