From c81e8e0b349080576e12f5264d9d7a6fdc32e3a2 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 30 Apr 2026 12:42:12 +0000 Subject: [PATCH 1/2] fix(operator): defer workflow reconciliation until runner is ready Workflow reconciliation was attempted during the Pending phase before the runner pod existed, causing DNS failures that were silently discarded. The observedGeneration was then set, preventing any retry in the Running phase. Move workflow reconciliation to the Running phase where the runner HTTP endpoint is reachable. Add NeedsWorkflowReconciliation check and ReconcileWorkflow to retry failed or missing workflow application with 5-second backoff. Also fix swallowed errors in ReconcileSpecChanges and add requeue on spec reconciliation failure. Closes #1486 Co-Authored-By: Claude Opus 4.6 --- .../internal/controller/reconcile_phases.go | 10 ++ .../operator/internal/handlers/helpers.go | 25 ++++ .../internal/handlers/helpers_test.go | 114 ++++++++++++++++++ .../operator/internal/handlers/reconciler.go | 28 ++++- .../operator/internal/handlers/sessions.go | 5 +- 5 files changed, 178 insertions(+), 4 deletions(-) mode change 100644 => 100755 components/operator/internal/controller/reconcile_phases.go mode change 100644 => 100755 components/operator/internal/handlers/helpers.go create mode 100644 components/operator/internal/handlers/helpers_test.go mode change 100644 => 100755 components/operator/internal/handlers/reconciler.go diff --git a/components/operator/internal/controller/reconcile_phases.go b/components/operator/internal/controller/reconcile_phases.go old mode 100644 new mode 100755 index 56123c884..fc02bc181 --- a/components/operator/internal/controller/reconcile_phases.go +++ b/components/operator/internal/controller/reconcile_phases.go @@ -331,6 +331,16 @@ func (r *AgenticSessionReconciler) reconcileRunning(ctx context.Context, session // Handle spec updates while running if err := handlers.ReconcileSpecChanges(ctx, session); err != nil { logger.Error(err, "Failed to reconcile running session spec", "name", name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + } + + // Retry workflow reconciliation if it failed during session creation + if handlers.NeedsWorkflowReconciliation(session) { + logger.Info("WorkflowReconciled is not True, retrying workflow reconciliation", "name", name) + if err := handlers.ReconcileWorkflow(ctx, session); err != nil { + logger.Error(err, "Failed to reconcile workflow", "name", name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } } diff --git a/components/operator/internal/handlers/helpers.go b/components/operator/internal/handlers/helpers.go old mode 100644 new mode 100755 index d87dc37ab..2e426320d --- a/components/operator/internal/handlers/helpers.go +++ b/components/operator/internal/handlers/helpers.go @@ -217,6 +217,31 @@ func clearAnnotation(sessionNamespace, name, annotationKey string) error { return nil } +// NeedsWorkflowReconciliation returns true if the session has an active workflow +// in its spec but the WorkflowReconciled condition is not True. +func NeedsWorkflowReconciliation(session *unstructured.Unstructured) bool { + spec, _, _ := unstructured.NestedMap(session.Object, "spec") + workflow, found, _ := unstructured.NestedMap(spec, "activeWorkflow") + if !found || len(workflow) == 0 { + return false + } + gitURL, _ := workflow["gitUrl"].(string) + if strings.TrimSpace(gitURL) == "" { + return false + } + + status, _, _ := unstructured.NestedMap(session.Object, "status") + conditions, _ := status["conditions"].([]interface{}) + for _, c := range conditions { + if cond, ok := c.(map[string]interface{}); ok { + if condType, _ := cond["type"].(string); strings.EqualFold(condType, conditionWorkflowReconciled) { + return cond["status"] != "True" + } + } + } + return true +} + // setCondition upserts a condition entry on the provided status map. func setCondition(status map[string]interface{}, update conditionUpdate) { now := time.Now().UTC().Format(time.RFC3339) diff --git a/components/operator/internal/handlers/helpers_test.go b/components/operator/internal/handlers/helpers_test.go new file mode 100644 index 000000000..4cf92265c --- /dev/null +++ b/components/operator/internal/handlers/helpers_test.go @@ -0,0 +1,114 @@ +package handlers + +import ( + "testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestNeedsWorkflowReconciliation(t *testing.T) { + tests := []struct { + name string + session *unstructured.Unstructured + want bool + }{ + { + name: "no activeWorkflow in spec", + session: makeSession(nil, nil), + want: false, + }, + { + name: "activeWorkflow with empty gitUrl", + session: makeSession( + map[string]interface{}{"gitUrl": "", "branch": "main", "path": ".ambient/workflows/test"}, + nil, + ), + want: false, + }, + { + name: "activeWorkflow present, no conditions at all", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + nil, + ), + want: true, + }, + { + name: "WorkflowReconciled condition is True", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "WorkflowReconciled", + "status": "True", + "reason": "Reconciled", + }, + }, + ), + want: false, + }, + { + name: "WorkflowReconciled condition is False", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "WorkflowReconciled", + "status": "False", + "reason": "UpdateFailed", + }, + }, + ), + want: true, + }, + { + name: "other conditions present but no WorkflowReconciled", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "Ready", + "status": "True", + }, + }, + ), + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NeedsWorkflowReconciliation(tt.session) + if got != tt.want { + t.Errorf("NeedsWorkflowReconciliation() = %v, want %v", got, tt.want) + } + }) + } +} + +func makeSession(activeWorkflow map[string]interface{}, conditions []interface{}) *unstructured.Unstructured { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "vteam.ambient-code/v1alpha1", + "kind": "AgenticSession", + "metadata": map[string]interface{}{ + "name": "test-session", + "namespace": "test-ns", + }, + "spec": map[string]interface{}{}, + }, + } + + if activeWorkflow != nil { + spec := obj.Object["spec"].(map[string]interface{}) + spec["activeWorkflow"] = activeWorkflow + } + + if conditions != nil { + obj.Object["status"] = map[string]interface{}{ + "conditions": conditions, + } + } + + return obj +} diff --git a/components/operator/internal/handlers/reconciler.go b/components/operator/internal/handlers/reconciler.go old mode 100644 new mode 100755 index e63a7ee44..02b9f62db --- a/components/operator/internal/handlers/reconciler.go +++ b/components/operator/internal/handlers/reconciler.go @@ -187,7 +187,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure Reason: "RepoReconciliationFailed", Message: fmt.Sprintf("Failed to reconcile repos: %v", err), }) - _ = statusPatch.Apply() + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply repo error status for %s/%s: %v", namespace, name, applyErr) + } return err } @@ -200,7 +202,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure Reason: "WorkflowReconciliationFailed", Message: fmt.Sprintf("Failed to reconcile workflow: %v", err), }) - _ = statusPatch.Apply() + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr) + } return err } @@ -216,6 +220,26 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure return statusPatch.Apply() } +// ReconcileWorkflow reconciles just the active workflow for a session. +// Called from the Running phase when WorkflowReconciled condition is not True. +func ReconcileWorkflow(ctx context.Context, session *unstructured.Unstructured) error { + namespace := session.GetNamespace() + name := session.GetName() + + spec, _, _ := unstructured.NestedMap(session.Object, "spec") + statusPatch := NewStatusPatch(namespace, name) + + if err := reconcileActiveWorkflowWithPatch(namespace, name, spec, session, statusPatch); err != nil { + log.Printf("[Reconcile] Failed to reconcile workflow for %s/%s: %v", namespace, name, err) + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr) + } + return err + } + + return statusPatch.Apply() +} + // UpdateSessionFromPodStatus updates the session status based on pod state. func UpdateSessionFromPodStatus(ctx context.Context, session *unstructured.Unstructured, pod *corev1.Pod) error { namespace := session.GetNamespace() diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index bfe44b31e..94186b026 100755 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -675,8 +675,9 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { // Extract spec information from the fresh object spec, _, _ := unstructured.NestedMap(currentObj.Object, "spec") - _ = reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch) - _ = reconcileActiveWorkflowWithPatch(sessionNamespace, name, spec, currentObj, statusPatch) + if err := reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch); err != nil { + log.Printf("[Reconcile] Failed to reconcile repos during pending phase for %s/%s: %v", sessionNamespace, name, err) + } prompt, _, _ := unstructured.NestedString(spec, "initialPrompt") timeout, _, _ := unstructured.NestedInt64(spec, "timeout") llmSettings, _, _ := unstructured.NestedMap(spec, "llmSettings") From fed63879c23a752225dbdfaa6382f0ce7647d252 Mon Sep 17 00:00:00 2001 From: Ambient Code Bot Date: Thu, 30 Apr 2026 12:42:50 +0000 Subject: [PATCH 2/2] style(operator): gofmt helpers_test.go Co-Authored-By: Claude Opus 4.6 --- components/operator/internal/handlers/helpers_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/components/operator/internal/handlers/helpers_test.go b/components/operator/internal/handlers/helpers_test.go index 4cf92265c..423d2f979 100644 --- a/components/operator/internal/handlers/helpers_test.go +++ b/components/operator/internal/handlers/helpers_test.go @@ -8,14 +8,14 @@ import ( func TestNeedsWorkflowReconciliation(t *testing.T) { tests := []struct { - name string + name string session *unstructured.Unstructured - want bool + want bool }{ { - name: "no activeWorkflow in spec", + name: "no activeWorkflow in spec", session: makeSession(nil, nil), - want: false, + want: false, }, { name: "activeWorkflow with empty gitUrl",