Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions components/operator/internal/controller/reconcile_phases.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ func (r *AgenticSessionReconciler) reconcileRunning(ctx context.Context, session
// Handle spec updates while running
if err := handlers.ReconcileSpecChanges(ctx, session); err != nil {
logger.Error(err, "Failed to reconcile running session spec", "name", name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
}

// Retry workflow reconciliation if it failed during session creation
if handlers.NeedsWorkflowReconciliation(session) {
logger.Info("WorkflowReconciled is not True, retrying workflow reconciliation", "name", name)
if err := handlers.ReconcileWorkflow(ctx, session); err != nil {
logger.Error(err, "Failed to reconcile workflow", "name", name)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
}

Expand Down
25 changes: 25 additions & 0 deletions components/operator/internal/handlers/helpers.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,31 @@ func clearAnnotation(sessionNamespace, name, annotationKey string) error {
return nil
}

// NeedsWorkflowReconciliation returns true if the session has an active workflow
// in its spec but the WorkflowReconciled condition is not True.
func NeedsWorkflowReconciliation(session *unstructured.Unstructured) bool {
spec, _, _ := unstructured.NestedMap(session.Object, "spec")
workflow, found, _ := unstructured.NestedMap(spec, "activeWorkflow")
if !found || len(workflow) == 0 {
return false
}
gitURL, _ := workflow["gitUrl"].(string)
if strings.TrimSpace(gitURL) == "" {
return false
}

status, _, _ := unstructured.NestedMap(session.Object, "status")
conditions, _ := status["conditions"].([]interface{})
for _, c := range conditions {
if cond, ok := c.(map[string]interface{}); ok {
if condType, _ := cond["type"].(string); strings.EqualFold(condType, conditionWorkflowReconciled) {
return cond["status"] != "True"
}
}
}
return true
}

// setCondition upserts a condition entry on the provided status map.
func setCondition(status map[string]interface{}, update conditionUpdate) {
now := time.Now().UTC().Format(time.RFC3339)
Expand Down
114 changes: 114 additions & 0 deletions components/operator/internal/handlers/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package handlers

import (
"testing"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestNeedsWorkflowReconciliation(t *testing.T) {
tests := []struct {
name string
session *unstructured.Unstructured
want bool
}{
{
name: "no activeWorkflow in spec",
session: makeSession(nil, nil),
want: false,
},
{
name: "activeWorkflow with empty gitUrl",
session: makeSession(
map[string]interface{}{"gitUrl": "", "branch": "main", "path": ".ambient/workflows/test"},
nil,
),
want: false,
},
{
name: "activeWorkflow present, no conditions at all",
session: makeSession(
map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"},
nil,
),
want: true,
},
{
name: "WorkflowReconciled condition is True",
session: makeSession(
map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"},
[]interface{}{
map[string]interface{}{
"type": "WorkflowReconciled",
"status": "True",
"reason": "Reconciled",
},
},
),
want: false,
},
{
name: "WorkflowReconciled condition is False",
session: makeSession(
map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"},
[]interface{}{
map[string]interface{}{
"type": "WorkflowReconciled",
"status": "False",
"reason": "UpdateFailed",
},
},
),
want: true,
},
{
name: "other conditions present but no WorkflowReconciled",
session: makeSession(
map[string]interface{}{"gitUrl": "https://github.com/org/repo.git", "branch": "main", "path": ".ambient/workflows/test"},
[]interface{}{
map[string]interface{}{
"type": "Ready",
"status": "True",
},
},
),
want: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NeedsWorkflowReconciliation(tt.session)
if got != tt.want {
t.Errorf("NeedsWorkflowReconciliation() = %v, want %v", got, tt.want)
}
})
}
}

func makeSession(activeWorkflow map[string]interface{}, conditions []interface{}) *unstructured.Unstructured {
obj := &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "vteam.ambient-code/v1alpha1",
"kind": "AgenticSession",
"metadata": map[string]interface{}{
"name": "test-session",
"namespace": "test-ns",
},
"spec": map[string]interface{}{},
},
}

if activeWorkflow != nil {
spec := obj.Object["spec"].(map[string]interface{})
spec["activeWorkflow"] = activeWorkflow
}

if conditions != nil {
obj.Object["status"] = map[string]interface{}{
"conditions": conditions,
}
}

return obj
}
28 changes: 26 additions & 2 deletions components/operator/internal/handlers/reconciler.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure
Reason: "RepoReconciliationFailed",
Message: fmt.Sprintf("Failed to reconcile repos: %v", err),
})
_ = statusPatch.Apply()
if applyErr := statusPatch.Apply(); applyErr != nil {
log.Printf("[Reconcile] Failed to apply repo error status for %s/%s: %v", namespace, name, applyErr)
}
return err
}

Expand All @@ -200,7 +202,9 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure
Reason: "WorkflowReconciliationFailed",
Message: fmt.Sprintf("Failed to reconcile workflow: %v", err),
})
_ = statusPatch.Apply()
if applyErr := statusPatch.Apply(); applyErr != nil {
log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr)
}
return err
}

Expand All @@ -216,6 +220,26 @@ func ReconcileSpecChanges(ctx context.Context, session *unstructured.Unstructure
return statusPatch.Apply()
}

// ReconcileWorkflow reconciles just the active workflow for a session.
// Called from the Running phase when WorkflowReconciled condition is not True.
func ReconcileWorkflow(ctx context.Context, session *unstructured.Unstructured) error {
namespace := session.GetNamespace()
name := session.GetName()

spec, _, _ := unstructured.NestedMap(session.Object, "spec")
statusPatch := NewStatusPatch(namespace, name)

if err := reconcileActiveWorkflowWithPatch(namespace, name, spec, session, statusPatch); err != nil {
log.Printf("[Reconcile] Failed to reconcile workflow for %s/%s: %v", namespace, name, err)
if applyErr := statusPatch.Apply(); applyErr != nil {
log.Printf("[Reconcile] Failed to apply workflow error status for %s/%s: %v", namespace, name, applyErr)
}
return err
}

return statusPatch.Apply()
}

// UpdateSessionFromPodStatus updates the session status based on pod state.
func UpdateSessionFromPodStatus(ctx context.Context, session *unstructured.Unstructured, pod *corev1.Pod) error {
namespace := session.GetNamespace()
Expand Down
5 changes: 3 additions & 2 deletions components/operator/internal/handlers/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,9 @@ func handleAgenticSessionEvent(obj *unstructured.Unstructured) error {

// Extract spec information from the fresh object
spec, _, _ := unstructured.NestedMap(currentObj.Object, "spec")
_ = reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch)
_ = reconcileActiveWorkflowWithPatch(sessionNamespace, name, spec, currentObj, statusPatch)
if err := reconcileSpecReposWithPatch(sessionNamespace, name, spec, currentObj, statusPatch); err != nil {
log.Printf("[Reconcile] Failed to reconcile repos during pending phase for %s/%s: %v", sessionNamespace, name, err)
}
prompt, _, _ := unstructured.NestedString(spec, "initialPrompt")
timeout, _, _ := unstructured.NestedInt64(spec, "timeout")
llmSettings, _, _ := unstructured.NestedMap(spec, "llmSettings")
Expand Down
Loading