diff --git a/components/operator/internal/controller/reconcile_phases.go b/components/operator/internal/controller/reconcile_phases.go old mode 100644 new mode 100755 index 56123c884..fc02bc181 --- a/components/operator/internal/controller/reconcile_phases.go +++ b/components/operator/internal/controller/reconcile_phases.go @@ -331,6 +331,16 @@ func (r *AgenticSessionReconciler) reconcileRunning(ctx context.Context, session // Handle spec updates while running if err := handlers.ReconcileSpecChanges(ctx, session); err != nil { logger.Error(err, "Failed to reconcile running session spec", "name", name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } + } + + // Retry workflow reconciliation if it failed during session creation + if handlers.NeedsWorkflowReconciliation(session) { + logger.Info("WorkflowReconciled is not True, retrying workflow reconciliation", "name", name) + if err := handlers.ReconcileWorkflow(ctx, session); err != nil { + logger.Error(err, "Failed to reconcile workflow", "name", name) + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil } } diff --git a/components/operator/internal/handlers/helpers.go b/components/operator/internal/handlers/helpers.go old mode 100644 new mode 100755 index d87dc37ab..2e426320d --- a/components/operator/internal/handlers/helpers.go +++ b/components/operator/internal/handlers/helpers.go @@ -217,6 +217,31 @@ func clearAnnotation(sessionNamespace, name, annotationKey string) error { return nil } +// NeedsWorkflowReconciliation returns true if the session has an active workflow +// in its spec but the WorkflowReconciled condition is not True. +func NeedsWorkflowReconciliation(session *unstructured.Unstructured) bool { + spec, _, _ := unstructured.NestedMap(session.Object, "spec") + workflow, found, _ := unstructured.NestedMap(spec, "activeWorkflow") + if !found || len(workflow) == 0 { + return false + } + gitURL, _ := workflow["gitUrl"].(string) + if strings.TrimSpace(gitURL) == "" { + return false + } + + status, _, _ := unstructured.NestedMap(session.Object, "status") + conditions, _ := status["conditions"].([]interface{}) + for _, c := range conditions { + if cond, ok := c.(map[string]interface{}); ok { + if condType, _ := cond["type"].(string); strings.EqualFold(condType, conditionWorkflowReconciled) { + return cond["status"] != "True" + } + } + } + return true +} + // setCondition upserts a condition entry on the provided status map. func setCondition(status map[string]interface{}, update conditionUpdate) { now := time.Now().UTC().Format(time.RFC3339) diff --git a/components/operator/internal/handlers/helpers_test.go b/components/operator/internal/handlers/helpers_test.go new file mode 100644 index 000000000..423d2f979 --- /dev/null +++ b/components/operator/internal/handlers/helpers_test.go @@ -0,0 +1,114 @@ +package handlers + +import ( + "testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestNeedsWorkflowReconciliation(t *testing.T) { + tests := []struct { + name string + session *unstructured.Unstructured + want bool + }{ + { + name: "no activeWorkflow in spec", + session: makeSession(nil, nil), + want: false, + }, + { + name: "activeWorkflow with empty gitUrl", + session: makeSession( + map[string]interface{}{"gitUrl": "", "branch": "main", "path": ".ambient/workflows/test"}, + nil, + ), + want: false, + }, + { + name: "activeWorkflow present, no conditions at all", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + nil, + ), + want: true, + }, + { + name: "WorkflowReconciled condition is True", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "WorkflowReconciled", + "status": "True", + "reason": "Reconciled", + }, + }, + ), + want: false, + }, + { + name: "WorkflowReconciled condition is False", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "WorkflowReconciled", + "status": "False", + "reason": "UpdateFailed", + }, + }, + ), + want: true, + }, + { + name: "other conditions present but no WorkflowReconciled", + session: makeSession( + map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"}, + []interface{}{ + map[string]interface{}{ + "type": "Ready", + "status": "True", + }, + }, + ), + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := NeedsWorkflowReconciliation(tt.session) + if got != tt.want { + t.Errorf("NeedsWorkflowReconciliation() = %v, want %v", got, tt.want) + } + }) + } +} + +func makeSession(activeWorkflow map[string]interface{}, conditions []interface{}) *unstructured.Unstructured { + obj := &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "vteam.ambient-code/v1alpha1", + "kind": "AgenticSession", + "metadata": map[string]interface{}{ + "name": "test-session", + "namespace": "test-ns", + }, + "spec": map[string]interface{}{}, + }, + } + + if activeWorkflow != nil { + spec := obj.Object["spec"].(map[string]interface{}) + spec["activeWorkflow"] = activeWorkflow + } + + if conditions != nil { + obj.Object["status"] = map[string]interface{}{ + "conditions": conditions, + } + } + + return obj +} diff --git a/components/operator/internal/handlers/reconciler.go b/components/operator/internal/handlers/reconciler.go old mode 100644 new mode 100755 index e63a7ee44..02b9f62db --- a/components/operator/internal/handlers/reconciler.go +++ b/components/operator/internal/handlers/reconciler.go @@ -187,7 +187,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure Reason: "RepoReconciliationFailed", Message: fmt.Sprintf("Failed to reconcile repos: %v", err), }) - _ = statusPatch.Apply() + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply repo error status for %s/%s: %v", namespace, name, applyErr) + } return err } @@ -200,7 +202,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure Reason: "WorkflowReconciliationFailed", Message: fmt.Sprintf("Failed to reconcile workflow: %v", err), }) - _ = statusPatch.Apply() + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr) + } return err } @@ -216,6 +220,26 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure return statusPatch.Apply() } +// ReconcileWorkflow reconciles just the active workflow for a session. +// Called from the Running phase when WorkflowReconciled condition is not True. +func ReconcileWorkflow(ctx context.Context, session *unstructured.Unstructured) error { + namespace := session.GetNamespace() + name := session.GetName() + + spec, _, _ := unstructured.NestedMap(session.Object, "spec") + statusPatch := NewStatusPatch(namespace, name) + + if err := reconcileActiveWorkflowWithPatch(namespace, name, spec, session, statusPatch); err != nil { + log.Printf("[Reconcile] Failed to reconcile workflow for %s/%s: %v", namespace, name, err) + if applyErr := statusPatch.Apply(); applyErr != nil { + log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr) + } + return err + } + + return statusPatch.Apply() +} + // UpdateSessionFromPodStatus updates the session status based on pod state. func UpdateSessionFromPodStatus(ctx context.Context, session *unstructured.Unstructured, pod *corev1.Pod) error { namespace := session.GetNamespace() diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index bfe44b31e..94186b026 100755 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -675,8 +675,9 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error { // Extract spec information from the fresh object spec, _, _ := unstructured.NestedMap(currentObj.Object, "spec") - _ = reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch) - _ = reconcileActiveWorkflowWithPatch(sessionNamespace, name, spec, currentObj, statusPatch) + if err := reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch); err != nil { + log.Printf("[Reconcile] Failed to reconcile repos during pending phase for %s/%s: %v", sessionNamespace, name, err) + } prompt, _, _ := unstructured.NestedString(spec, "initialPrompt") timeout, _, _ := unstructured.NestedInt64(spec, "timeout") llmSettings, _, _ := unstructured.NestedMap(spec, "llmSettings")