Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ssa/manager_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc
}
changeSet.Append(cs.Entries)

if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{opts.WaitInterval, opts.WaitTimeout, false}); err != nil {
if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{Interval: opts.WaitInterval, Timeout: opts.WaitTimeout}); err != nil {
return changeSet, err
}
}
Expand All @@ -304,7 +304,7 @@ func (m *ResourceManager) ApplyAllStaged(ctx context.Context, objects []*unstruc
}
changeSet.Append(cs.Entries)

if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{opts.WaitInterval, opts.WaitTimeout, false}); err != nil {
if err := m.WaitForSet(cs.ToObjMetadataSet(), WaitOptions{Interval: opts.WaitInterval, Timeout: opts.WaitTimeout}); err != nil {
return changeSet, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion ssa/manager_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestDelete(t *testing.T) {
t.Error(err)
}

if err := manager.WaitForTermination(objects, WaitOptions{time.Second, 5 * time.Second, false}); err != nil {
if err := manager.WaitForTermination(objects, WaitOptions{Interval: time.Second, Timeout: 5 * time.Second}); err != nil {
// workaround for https://github.com/kubernetes-sigs/controller-runtime/issues/880
if !strings.Contains(err.Error(), "Namespace/") {
t.Error(err)
Expand Down
20 changes: 19 additions & 1 deletion ssa/manager_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type WaitOptions struct {

// FailFast makes the Wait function return an error as soon as a resource reaches the failed state.
FailFast bool

// JobsWithTTL is a set of Job identifiers that have spec.ttlSecondsAfterFinished set.
// NotFound status for these Jobs is treated as successful completion since they
// are deleted by the TTL controller after finishing.
JobsWithTTL object.ObjMetadataSet
}

// DefaultWaitOptions returns the default wait options where the poll interval is set to
Expand Down Expand Up @@ -103,8 +108,14 @@ func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object.
}
lastStatus[rs.Identifier] = rs

// Treat NotFound Jobs with TTL as Current for aggregation purposes
effectiveStatus := rs.Status
if rs.Status == status.NotFoundStatus && opts.JobsWithTTL.Contains(rs.Identifier) {
effectiveStatus = status.CurrentStatus
}

rss = append(rss, rs)
counts[rs.Status]++
counts[effectiveStatus]++
}

// If only Failed or Current statuses are present,
Expand Down Expand Up @@ -149,7 +160,14 @@ func (m *ResourceManager) WaitForSetWithContext(ctx context.Context, set object.
for id, rs := range statusCollector.ResourceStatuses {
switch {
case rs == nil || lastStatus[id] == nil:
// Skip Jobs with TTL that are deleted after completion
if opts.JobsWithTTL.Contains(id) {
continue
}
errs = append(errs, fmt.Sprintf("can't determine status for %s", utils.FmtObjMetadata(id)))
case lastStatus[id].Status == status.NotFoundStatus && opts.JobsWithTTL.Contains(id):
// Job with TTL was deleted after completion, treat as successful completion
continue
case lastStatus[id].Status == status.FailedStatus,
errors.Is(ctx.Err(), context.DeadlineExceeded) &&
lastStatus[id].Status != status.CurrentStatus:
Expand Down
85 changes: 84 additions & 1 deletion ssa/manager_wait_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWaitForSet(t *testing.T) {
t.Fatal(err)
}

if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{time.Second, 3 * time.Second, false}); err == nil {
if err := manager.WaitForSet(changeSet.ToObjMetadataSet(), WaitOptions{Interval: time.Second, Timeout: 3 * time.Second}); err == nil {
t.Error("wanted wait error due to observedGeneration < generation")
}

Expand Down Expand Up @@ -507,6 +507,89 @@ func TestWaitWithContext_Cancellation(t *testing.T) {
}
}

func TestWaitForSet_JobWithTTL(t *testing.T) {
g := NewWithT(t)

id := generateName("job-ttl")

// Create a Job with ttlSecondsAfterFinished set
job := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": map[string]any{
"name": id,
"namespace": "default",
},
"spec": map[string]any{
"ttlSecondsAfterFinished": int64(0),
"template": map[string]any{
"spec": map[string]any{
"restartPolicy": "Never",
"containers": []any{
map[string]any{
"name": "test",
"image": "busybox",
"command": []any{"echo", "hello"},
},
},
},
},
},
},
}

// Apply the Job
_, err := manager.ApplyAll(context.Background(), []*unstructured.Unstructured{job}, DefaultApplyOptions())
g.Expect(err).NotTo(HaveOccurred())

jobObjMeta := object.UnstructuredToObjMetadata(job)

// Use a custom status reader that returns NotFoundStatus for the Job
// to simulate the TTL controller deleting it after completion
manager.poller = polling.NewStatusPoller(manager.client, restMapper, polling.Options{
CustomStatusReaders: []engine.StatusReader{
kstatusreaders.NewGenericStatusReader(restMapper,
func(u *unstructured.Unstructured) (*status.Result, error) {
if u.GetKind() == "Job" && u.GetName() == id {
return &status.Result{
Status: status.NotFoundStatus,
Message: "Resource not found",
}, nil
}
return status.Compute(u)
},
),
},
})
defer func() {
manager.poller = poller
}()

t.Run("NotFound Job with TTL is treated as success", func(t *testing.T) {
start := time.Now()
err = manager.WaitForSet([]object.ObjMetadata{jobObjMeta}, WaitOptions{
Interval: 100 * time.Millisecond,
Timeout: 5 * time.Second,
JobsWithTTL: object.ObjMetadataSet{jobObjMeta},
})
elapsed := time.Since(start)

g.Expect(err).NotTo(HaveOccurred(), "NotFound status for Job with TTL should be treated as success")
g.Expect(elapsed).To(BeNumerically("<", 2*time.Second), "should return early, not wait for full timeout")
})

t.Run("NotFound Job without TTL option is treated as error", func(t *testing.T) {
err = manager.WaitForSet([]object.ObjMetadata{jobObjMeta}, WaitOptions{
Interval: 100 * time.Millisecond,
Timeout: 2 * time.Second,
// JobsWithTTL not set
})
g.Expect(err).To(HaveOccurred(), "NotFound status for Job without TTL option should be an error")
g.Expect(err.Error()).To(ContainSubstring("NotFound"))
})
}

func TestWaitForSetTermination(t *testing.T) {
timeout := 10 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
Expand Down
47 changes: 47 additions & 0 deletions ssa/utils/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
Copyright 2026 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/fluxcd/cli-utils/pkg/object"
)

// ExtractJobsWithTTL returns the ObjMetadataSet of Jobs that have spec.ttlSecondsAfterFinished set.
func ExtractJobsWithTTL(objects []*unstructured.Unstructured) object.ObjMetadataSet {
var result object.ObjMetadataSet
for _, obj := range objects {
if !IsJob(obj) {
continue
}
// Check if ttlSecondsAfterFinished is set (any value including 0)
_, found, err := unstructured.NestedInt64(obj.Object, "spec", "ttlSecondsAfterFinished")
if err == nil && found {
result = append(result, object.UnstructuredToObjMetadata(obj))
}
}
return result
}

// IsJob returns true if the object is a Kubernetes Job.
func IsJob(obj *unstructured.Unstructured) bool {
return obj.GetKind() == "Job" &&
strings.HasPrefix(obj.GetAPIVersion(), "batch/")
}
173 changes: 173 additions & 0 deletions ssa/utils/job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
Copyright 2026 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestIsJob(t *testing.T) {
tests := []struct {
name string
apiVersion string
kind string
want bool
}{
{
name: "batch/v1 Job",
apiVersion: "batch/v1",
kind: "Job",
want: true,
},
{
name: "batch/v1beta1 Job",
apiVersion: "batch/v1beta1",
kind: "Job",
want: true,
},
{
name: "apps/v1 Deployment",
apiVersion: "apps/v1",
kind: "Deployment",
want: false,
},
{
name: "v1 ConfigMap",
apiVersion: "v1",
kind: "ConfigMap",
want: false,
},
{
name: "batch/v1 CronJob",
apiVersion: "batch/v1",
kind: "CronJob",
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion(tt.apiVersion)
obj.SetKind(tt.kind)

if got := IsJob(obj); got != tt.want {
t.Errorf("IsJob() = %v, want %v", got, tt.want)
}
})
}
}

func TestExtractJobsWithTTL(t *testing.T) {
tests := []struct {
name string
objects []*unstructured.Unstructured
want int
}{
{
name: "empty list",
objects: []*unstructured.Unstructured{},
want: 0,
},
{
name: "Job with ttlSecondsAfterFinished: 0",
objects: []*unstructured.Unstructured{
makeJob("test-job", "default", int64Ptr(0)),
},
want: 1,
},
{
name: "Job with ttlSecondsAfterFinished: 300",
objects: []*unstructured.Unstructured{
makeJob("test-job", "default", int64Ptr(300)),
},
want: 1,
},
{
name: "Job without ttlSecondsAfterFinished",
objects: []*unstructured.Unstructured{
makeJob("test-job", "default", nil),
},
want: 0,
},
{
name: "non-Job resources",
objects: []*unstructured.Unstructured{
makeDeployment("test-deploy", "default"),
makeConfigMap("test-cm", "default"),
},
want: 0,
},
{
name: "mixed list of resources",
objects: []*unstructured.Unstructured{
makeJob("job-with-ttl", "default", int64Ptr(60)),
makeJob("job-without-ttl", "default", nil),
makeDeployment("test-deploy", "default"),
makeJob("another-job-with-ttl", "other", int64Ptr(0)),
},
want: 2,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := ExtractJobsWithTTL(tt.objects)
if len(result) != tt.want {
t.Errorf("ExtractJobsWithTTL() returned %d items, want %d", len(result), tt.want)
}
})
}
}

func int64Ptr(v int64) *int64 {
return &v
}

func makeJob(name, namespace string, ttlSecondsAfterFinished *int64) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion("batch/v1")
obj.SetKind("Job")
obj.SetName(name)
obj.SetNamespace(namespace)

if ttlSecondsAfterFinished != nil {
_ = unstructured.SetNestedField(obj.Object, *ttlSecondsAfterFinished, "spec", "ttlSecondsAfterFinished")
}

return obj
}

func makeDeployment(name, namespace string) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion("apps/v1")
obj.SetKind("Deployment")
obj.SetName(name)
obj.SetNamespace(namespace)
return obj
}

func makeConfigMap(name, namespace string) *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion("v1")
obj.SetKind("ConfigMap")
obj.SetName(name)
obj.SetNamespace(namespace)
return obj
}
Loading