From b1a1dda8ef96eb390a17539c5cf8add25dff57bc Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Tue, 24 Mar 2026 23:46:16 +0530 Subject: [PATCH 1/6] fix: deleting a TWD leaves stale versioning data on Temporal server, blocking unversioned workers Signed-off-by: Anuj Agrawal --- internal/controller/worker_controller.go | 167 +++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 25f2f533..1e00c0a6 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -10,10 +10,13 @@ import ( "fmt" "time" + "github.com/go-logr/logr" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/temporal" + "go.temporal.io/api/serviceerror" + sdkclient "go.temporal.io/sdk/client" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +28,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -41,6 +45,10 @@ const ( // wrtWorkerRefKey is the field index key for WorkerResourceTemplate by temporalWorkerDeploymentRef.name. wrtWorkerRefKey = ".spec.temporalWorkerDeploymentRef.name" + + // twdFinalizerName is the finalizer added to TemporalWorkerDeployment resources + // to ensure Temporal server-side versioning data is cleaned up before the CRD is deleted. + twdFinalizerName = "temporal.io/worker-deployment-cleanup" ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -133,6 +141,35 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, r.markWRTsTWDNotFound(ctx, req.NamespacedName) } + // Handle deletion: clean up Temporal server-side versioning data before allowing + // the CRD to be deleted. Without this, stale build ID routing persists in Temporal + // and prevents unversioned workers from picking up tasks on the same task queue. + if !workerDeploy.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(&workerDeploy, twdFinalizerName) { + l.Info("TemporalWorkerDeployment is being deleted, running cleanup") + if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil { + l.Error(err, "failed to clean up Temporal server-side deployment data") + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + + // Cleanup succeeded, remove the finalizer so K8s can delete the resource + controllerutil.RemoveFinalizer(&workerDeploy, twdFinalizerName) + if err := r.Update(ctx, &workerDeploy); err != nil { + return ctrl.Result{}, err + } + l.Info("Temporal server-side cleanup complete, finalizer removed") + } + return ctrl.Result{}, nil + } + + // Ensure finalizer is present on non-deleted resources + if !controllerutil.ContainsFinalizer(&workerDeploy, twdFinalizerName) { + controllerutil.AddFinalizer(&workerDeploy, twdFinalizerName) + if err := r.Update(ctx, &workerDeploy); err != nil { + return ctrl.Result{}, err + } + } + // TODO(jlegrone): Set defaults via webhook rather than manually if err := workerDeploy.Default(ctx, &workerDeploy); err != nil { l.Error(err, "TemporalWorkerDeployment defaulter failed") @@ -336,6 +373,136 @@ func (r *TemporalWorkerDeploymentReconciler) markWRTsTWDNotFound(ctx context.Con return errors.Join(errs...) } +// handleDeletion cleans up Temporal server-side deployment versioning data when +// a TemporalWorkerDeployment CRD is deleted. This prevents stale build ID routing +// from blocking unversioned workers on the same task queue. +// +// The cleanup sequence: +// 1. Set the current version to "unversioned" (empty BuildID) so new tasks route to unversioned workers +// 2. Delete all non-current/non-ramping versions (drained/inactive ones) +// 3. The deployment itself will be garbage collected by Temporal once all versions are removed +func (r *TemporalWorkerDeploymentReconciler) handleDeletion( + ctx context.Context, + l logr.Logger, + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + // Resolve Temporal connection + var temporalConnection temporaliov1alpha1.TemporalConnection + if err := r.Get(ctx, types.NamespacedName{ + Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: workerDeploy.Namespace, + }, &temporalConnection); err != nil { + if apierrors.IsNotFound(err) { + // TemporalConnection already deleted; we can't talk to Temporal. + // Log a warning and allow deletion to proceed to avoid blocking forever. + l.Info("TemporalConnection not found, skipping Temporal server-side cleanup") + return nil + } + return fmt.Errorf("unable to fetch TemporalConnection: %w", err) + } + + authMode, secretName, err := resolveAuthSecretName(&temporalConnection) + if err != nil { + return fmt.Errorf("unable to resolve auth secret name: %w", err) + } + + temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ + HostPort: temporalConnection.Spec.HostPort, + Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, + SecretName: secretName, + AuthMode: authMode, + }) + if !ok { + clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{ + K8sNamespace: workerDeploy.Namespace, + TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, + Spec: temporalConnection.Spec, + Identity: getControllerIdentity(), + }) + if err != nil { + return fmt.Errorf("unable to parse Temporal auth secret: %w", err) + } + c, err := r.TemporalClientPool.DialAndUpsertClient(*clientOpts, *key, *clientAuth) + if err != nil { + return fmt.Errorf("unable to create TemporalClient: %w", err) + } + temporalClient = c + } + + workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy) + deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName) + + // Describe the deployment to get current state + resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + l.Info("Worker Deployment not found on Temporal server, nothing to clean up") + return nil + } + return fmt.Errorf("unable to describe worker deployment: %w", err) + } + + routingConfig := resp.Info.RoutingConfig + + // Step 1: Set current version to unversioned (empty BuildID) so tasks route to unversioned workers. + // This is the critical step that unblocks task dispatch. + if routingConfig.CurrentVersion != nil { + l.Info("Setting current version to unversioned", "previousBuildID", routingConfig.CurrentVersion.BuildID) + if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: "", // empty = unversioned + ConflictToken: resp.ConflictToken, + Identity: getControllerIdentity(), + IgnoreMissingTaskQueues: true, + }); err != nil { + return fmt.Errorf("unable to set current version to unversioned: %w", err) + } + l.Info("Successfully set current version to unversioned") + } else { + l.Info("No current version set, skipping unversioned redirect") + } + + // Step 2: If there's a ramping version, clear it. + if routingConfig.RampingVersion != nil { + l.Info("Clearing ramping version", "buildID", routingConfig.RampingVersion.BuildID) + if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ + BuildID: "", + Percentage: 0, + Identity: getControllerIdentity(), + }); err != nil { + l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err) + } + } + + // Step 3: Delete versions that are eligible. Versions that are still draining + // are force-deleted with SkipDrainage since the TWD is being removed entirely. + for _, version := range resp.Info.VersionSummaries { + buildID := version.Version.BuildID + l.Info("Deleting worker deployment version", "buildID", buildID) + if _, err := deploymentHandler.DeleteVersion(ctx, sdkclient.WorkerDeploymentDeleteVersionOptions{ + BuildID: buildID, + SkipDrainage: true, + Identity: getControllerIdentity(), + }); err != nil { + // Log but don't fail -- the version may still have pollers or be current. + // Temporal will garbage collect it eventually. + l.Info("Could not delete version (will be garbage collected)", "buildID", buildID, "error", err) + } + } + + // Step 4: Attempt to delete the deployment itself. This only succeeds if all versions are gone. + l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName) + if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{ + Name: workerDeploymentName, + Identity: getControllerIdentity(), + }); err != nil { + // Non-fatal: deployment will be garbage collected once all versions drain. + l.Info("Could not delete worker deployment (will be garbage collected)", "name", workerDeploymentName, "error", err) + } + + return nil +} + // setCondition sets a condition on the TemporalWorkerDeployment status. func (r *TemporalWorkerDeploymentReconciler) setCondition( workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, From 0027f6bf451eb038a28fbf0321c7ac9d7ee9ffb7 Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Wed, 25 Mar 2026 17:32:15 +0530 Subject: [PATCH 2/6] fix: persist connection until twd deletes for cleanup Signed-off-by: Anuj Agrawal --- .../templates/rbac.yaml | 8 ++ internal/controller/worker_controller.go | 99 +++++++++++++++++-- 2 files changed, 99 insertions(+), 8 deletions(-) diff --git a/helm/temporal-worker-controller/templates/rbac.yaml b/helm/temporal-worker-controller/templates/rbac.yaml index b73faca5..7175029a 100644 --- a/helm/temporal-worker-controller/templates/rbac.yaml +++ b/helm/temporal-worker-controller/templates/rbac.yaml @@ -96,7 +96,15 @@ rules: verbs: - get - list + - patch + - update - watch + - apiGroups: + - temporal.io + resources: + - temporalconnections/finalizers + verbs: + - update - apiGroups: - temporal.io resources: diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 1e00c0a6..3f195699 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -49,6 +49,11 @@ const ( // twdFinalizerName is the finalizer added to TemporalWorkerDeployment resources // to ensure Temporal server-side versioning data is cleaned up before the CRD is deleted. twdFinalizerName = "temporal.io/worker-deployment-cleanup" + + // tcFinalizerName is the finalizer added to TemporalConnection resources to prevent + // them from being deleted while any TemporalWorkerDeployment still references them. + // This ensures the connection is available during TWD deletion cleanup. + tcFinalizerName = "temporal.io/connection-in-use" ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -106,7 +111,8 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update -//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch +//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch +//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update @@ -152,6 +158,12 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } + // Remove our finalizer from the TemporalConnection if no other TWDs reference it. + if err := r.removeConnectionFinalizerIfUnused(ctx, l, &workerDeploy); err != nil { + l.Error(err, "failed to remove finalizer from TemporalConnection") + return ctrl.Result{}, err + } + // Cleanup succeeded, remove the finalizer so K8s can delete the resource controllerutil.RemoveFinalizer(&workerDeploy, twdFinalizerName) if err := r.Update(ctx, &workerDeploy); err != nil { @@ -201,6 +213,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Ensure our finalizer is on the TemporalConnection so it cannot be deleted + // while this TWD still references it. This guarantees the connection is available + // during TWD deletion cleanup. + if err := r.ensureConnectionFinalizer(ctx, l, &temporalConnection); err != nil { + return ctrl.Result{}, err + } + // Get the Auth Mode and Secret Name authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { @@ -386,18 +405,14 @@ func (r *TemporalWorkerDeploymentReconciler) handleDeletion( l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, ) error { - // Resolve Temporal connection + // Resolve Temporal connection. + // The TemporalConnection is guaranteed to exist because we hold a finalizer on it + // that prevents deletion while any TWD references it. var temporalConnection temporaliov1alpha1.TemporalConnection if err := r.Get(ctx, types.NamespacedName{ Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, Namespace: workerDeploy.Namespace, }, &temporalConnection); err != nil { - if apierrors.IsNotFound(err) { - // TemporalConnection already deleted; we can't talk to Temporal. - // Log a warning and allow deletion to proceed to avoid blocking forever. - l.Info("TemporalConnection not found, skipping Temporal server-side cleanup") - return nil - } return fmt.Errorf("unable to fetch TemporalConnection: %w", err) } @@ -590,6 +605,74 @@ func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetBlocked( _ = r.Status().Update(ctx, workerDeploy) } +// ensureConnectionFinalizer adds our finalizer to the TemporalConnection so it +// cannot be deleted while this TWD still needs it for cleanup. +func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer( + ctx context.Context, + l logr.Logger, + tc *temporaliov1alpha1.TemporalConnection, +) error { + if !controllerutil.ContainsFinalizer(tc, tcFinalizerName) { + l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name) + controllerutil.AddFinalizer(tc, tcFinalizerName) + if err := r.Update(ctx, tc); err != nil { + return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err) + } + } + return nil +} + +// removeConnectionFinalizerIfUnused removes our finalizer from the TemporalConnection +// if no other TWDs (besides the one being deleted) still reference it. +func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused( + ctx context.Context, + l logr.Logger, + deletingTWD *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + connectionName := deletingTWD.Spec.WorkerOptions.TemporalConnectionRef.Name + + // List all TWDs in the same namespace + var twds temporaliov1alpha1.TemporalWorkerDeploymentList + if err := r.List(ctx, &twds, client.InNamespace(deletingTWD.Namespace)); err != nil { + return fmt.Errorf("unable to list TWDs: %w", err) + } + + // Check if any other TWD (not the one being deleted) references this connection + for i := range twds.Items { + twd := &twds.Items[i] + if twd.Name == deletingTWD.Name { + continue + } + if twd.Spec.WorkerOptions.TemporalConnectionRef.Name == connectionName { + l.Info("TemporalConnection still referenced by another TWD, keeping finalizer", + "connection", connectionName, "referencedBy", twd.Name) + return nil + } + } + + // No other TWDs reference this connection, remove the finalizer + var tc temporaliov1alpha1.TemporalConnection + if err := r.Get(ctx, types.NamespacedName{ + Name: connectionName, + Namespace: deletingTWD.Namespace, + }, &tc); err != nil { + if apierrors.IsNotFound(err) { + return nil // already gone + } + return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err) + } + + if controllerutil.ContainsFinalizer(&tc, tcFinalizerName) { + l.Info("Removing finalizer from TemporalConnection", "connection", connectionName) + controllerutil.RemoveFinalizer(&tc, tcFinalizerName) + if err := r.Update(ctx, &tc); err != nil { + return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err) + } + } + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { From 95b3661429d2aef2e0fca81f8f0e64c7fa28e115 Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Wed, 25 Mar 2026 20:47:01 +0530 Subject: [PATCH 3/6] fix: add else log for no ramping version Signed-off-by: Anuj Agrawal --- internal/controller/worker_controller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 3f195699..7cc41e78 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -487,6 +487,8 @@ func (r *TemporalWorkerDeploymentReconciler) handleDeletion( }); err != nil { l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err) } + } else { + l.Info("No ramping version set, skipping clear ramping version") } // Step 3: Delete versions that are eligible. Versions that are still draining From 70509315fa76e96a1d2f59a2fbedb381f1692ea3 Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Wed, 25 Mar 2026 20:59:41 +0530 Subject: [PATCH 4/6] fix: use single temporal.io/delete-protection finalizer Signed-off-by: Anuj Agrawal --- internal/controller/worker_controller.go | 29 +++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 7cc41e78..63d940be 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -46,14 +46,11 @@ const ( // wrtWorkerRefKey is the field index key for WorkerResourceTemplate by temporalWorkerDeploymentRef.name. wrtWorkerRefKey = ".spec.temporalWorkerDeploymentRef.name" - // twdFinalizerName is the finalizer added to TemporalWorkerDeployment resources - // to ensure Temporal server-side versioning data is cleaned up before the CRD is deleted. - twdFinalizerName = "temporal.io/worker-deployment-cleanup" - - // tcFinalizerName is the finalizer added to TemporalConnection resources to prevent - // them from being deleted while any TemporalWorkerDeployment still references them. - // This ensures the connection is available during TWD deletion cleanup. - tcFinalizerName = "temporal.io/connection-in-use" + // finalizerName is the finalizer added to TemporalWorkerDeployment and TemporalConnection + // resources to prevent deletion before cleanup actions are taken. On TWD resources, it + // ensures Temporal server-side versioning data is cleaned up. On TemporalConnection + // resources, it prevents deletion while any TWD still references the connection. + finalizerName = "temporal.io/delete-protection" ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -151,7 +148,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req // the CRD to be deleted. Without this, stale build ID routing persists in Temporal // and prevents unversioned workers from picking up tasks on the same task queue. if !workerDeploy.DeletionTimestamp.IsZero() { - if controllerutil.ContainsFinalizer(&workerDeploy, twdFinalizerName) { + if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) { l.Info("TemporalWorkerDeployment is being deleted, running cleanup") if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil { l.Error(err, "failed to clean up Temporal server-side deployment data") @@ -165,7 +162,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } // Cleanup succeeded, remove the finalizer so K8s can delete the resource - controllerutil.RemoveFinalizer(&workerDeploy, twdFinalizerName) + controllerutil.RemoveFinalizer(&workerDeploy, finalizerName) if err := r.Update(ctx, &workerDeploy); err != nil { return ctrl.Result{}, err } @@ -175,8 +172,8 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } // Ensure finalizer is present on non-deleted resources - if !controllerutil.ContainsFinalizer(&workerDeploy, twdFinalizerName) { - controllerutil.AddFinalizer(&workerDeploy, twdFinalizerName) + if !controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) { + controllerutil.AddFinalizer(&workerDeploy, finalizerName) if err := r.Update(ctx, &workerDeploy); err != nil { return ctrl.Result{}, err } @@ -614,9 +611,9 @@ func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer( l logr.Logger, tc *temporaliov1alpha1.TemporalConnection, ) error { - if !controllerutil.ContainsFinalizer(tc, tcFinalizerName) { + if !controllerutil.ContainsFinalizer(tc, finalizerName) { l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name) - controllerutil.AddFinalizer(tc, tcFinalizerName) + controllerutil.AddFinalizer(tc, finalizerName) if err := r.Update(ctx, tc); err != nil { return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err) } @@ -664,9 +661,9 @@ func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused( return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err) } - if controllerutil.ContainsFinalizer(&tc, tcFinalizerName) { + if controllerutil.ContainsFinalizer(&tc, finalizerName) { l.Info("Removing finalizer from TemporalConnection", "connection", connectionName) - controllerutil.RemoveFinalizer(&tc, tcFinalizerName) + controllerutil.RemoveFinalizer(&tc, finalizerName) if err := r.Update(ctx, &tc); err != nil { return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err) } From 8d914944d3875b49ff62d88e2072cbd112c5756b Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Thu, 26 Mar 2026 12:30:49 +0530 Subject: [PATCH 5/6] fix: integration tests Signed-off-by: Anuj Agrawal --- .../internal/deletion_integration_test.go | 314 ++++++++++++++++++ internal/tests/internal/integration_test.go | 4 + 2 files changed, 318 insertions(+) create mode 100644 internal/tests/internal/deletion_integration_test.go diff --git a/internal/tests/internal/deletion_integration_test.go b/internal/tests/internal/deletion_integration_test.go new file mode 100644 index 00000000..05c35cbd --- /dev/null +++ b/internal/tests/internal/deletion_integration_test.go @@ -0,0 +1,314 @@ +package internal + +// Tests that deleting a TemporalWorkerDeployment CRD correctly cleans up +// Temporal server-side versioning data and handles edge cases like the +// TemporalConnection being deleted simultaneously by Helm. +// +// Covered: +// - TWD deletion sets current version to unversioned on Temporal server +// - TWD deletion removes finalizer from TemporalConnection when no other TWDs reference it +// - TWD is fully deleted from K8s after cleanup (finalizer removed) +// - TWD deletion with TemporalConnection deleted simultaneously (Helm race condition) still succeeds + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/k8s" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" + "go.temporal.io/api/serviceerror" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/server/temporaltest" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const deletionFinalizerName = "temporal.io/delete-protection" + +func runDeletionTests( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + testNamespace string, +) { + t.Run("deletion-sets-current-to-unversioned", func(t *testing.T) { + testDeletionSetsCurrentToUnversioned(t, k8sClient, mgr, ts, testNamespace) + }) + + t.Run("deletion-removes-connection-finalizer", func(t *testing.T) { + testDeletionRemovesConnectionFinalizer(t, k8sClient, mgr, ts, testNamespace) + }) +} + +// testDeletionSetsCurrentToUnversioned verifies the core fix: when a TWD is deleted, +// the controller sets the current version to unversioned so tasks route to unversioned workers, +// and the TWD is fully deleted from K8s. +func testDeletionSetsCurrentToUnversioned( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + namespace string, +) { + ctx := context.Background() + testName := "del-cleanup" + + // Build a TWD using the standard builder pattern + tc := testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1.0", true, false), + ). + BuildWithValues(testName, namespace, ts.GetDefaultNamespace()) + + twd := tc.GetTWD() + + // Create a TemporalConnection + temporalConnection := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: twd.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: ts.GetFrontendHostPort(), + }, + } + if err := k8sClient.Create(ctx, temporalConnection); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + + // Create the TWD + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + // Wait for the child deployment to be created by the controller + workerDeploymentName := k8s.ComputeWorkerDeploymentName(twd) + buildID := k8s.ComputeBuildID(twd) + expectedDeploymentName := k8s.ComputeVersionedDeploymentName(twd.Name, buildID) + + eventually(t, 30*time.Second, time.Second, func() error { + var dep appsv1.Deployment + return k8sClient.Get(ctx, types.NamespacedName{ + Name: expectedDeploymentName, Namespace: namespace, + }, &dep) + }) + + // Start workers so the version registers on the Temporal server + env := testhelpers.TestEnv{ + K8sClient: k8sClient, + Mgr: mgr, + Ts: ts, + Connection: temporalConnection, + } + workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, namespace) + defer handleStopFuncs(workerStopFuncs) + + // Wait until the version becomes current on the Temporal server + deploymentHandle := ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) + eventually(t, 60*time.Second, 2*time.Second, func() error { + resp, err := deploymentHandle.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + return err + } + if resp.Info.RoutingConfig.CurrentVersion == nil { + return errors.New("current version not set yet") + } + return nil + }) + t.Log("TWD is reconciled with a current version set") + + // Verify the TWD has our finalizer + var twdBeforeDelete temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &twdBeforeDelete); err != nil { + t.Fatalf("failed to get TWD: %v", err) + } + hasFinalizer := false + for _, f := range twdBeforeDelete.Finalizers { + if f == deletionFinalizerName { + hasFinalizer = true + break + } + } + if !hasFinalizer { + t.Fatalf("TWD does not have expected finalizer %q", deletionFinalizerName) + } + + // Delete the TWD + t.Log("Deleting the TemporalWorkerDeployment") + if err := k8sClient.Delete(ctx, &twdBeforeDelete); err != nil { + t.Fatalf("failed to delete TWD: %v", err) + } + + // Verify the TWD is eventually deleted (finalizer ran and was removed) + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check) + if err != nil { + return nil // not found = deleted + } + return errors.New("TWD still exists, finalizer may not have completed") + }) + t.Log("TWD deleted successfully (finalizer completed)") + + // Verify Temporal server-side state: current version should be unversioned + resp, err := deploymentHandle.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + t.Log("Worker Deployment was fully deleted from Temporal server") + return + } + t.Fatalf("failed to describe worker deployment after deletion: %v", err) + } + + if resp.Info.RoutingConfig.CurrentVersion != nil { + t.Errorf("expected current version to be nil (unversioned) after TWD deletion, got buildID=%q", + resp.Info.RoutingConfig.CurrentVersion.BuildID) + } else { + t.Log("Verified: current version is unversioned after TWD deletion") + } + + // Suppress unused variable warning for env + _ = env +} + +// testDeletionRemovesConnectionFinalizer verifies that when a TWD is deleted, +// the controller removes its finalizer from the TemporalConnection, allowing +// the connection to be deleted by K8s. This tests the Helm race condition fix. +func testDeletionRemovesConnectionFinalizer( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + namespace string, +) { + ctx := context.Background() + testName := "del-conn-finalizer" + + // Build a TWD with manual strategy (simpler, no need to reach current version) + tc := testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusInactive, -1, true, false), + ). + BuildWithValues(testName, namespace, ts.GetDefaultNamespace()) + + twd := tc.GetTWD() + + // Create a TemporalConnection + temporalConnection := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: twd.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: ts.GetFrontendHostPort(), + }, + } + if err := k8sClient.Create(ctx, temporalConnection); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + + // Create the TWD + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + // Wait for the finalizer to be added to both TWD and TemporalConnection + eventually(t, 30*time.Second, time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check); err != nil { + return err + } + for _, f := range check.Finalizers { + if f == deletionFinalizerName { + return nil + } + } + return fmt.Errorf("TWD finalizer %q not yet added", deletionFinalizerName) + }) + + eventually(t, 30*time.Second, time.Second, func() error { + var check temporaliov1alpha1.TemporalConnection + if err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &check); err != nil { + return err + } + for _, f := range check.Finalizers { + if f == deletionFinalizerName { + return nil + } + } + return fmt.Errorf("TemporalConnection finalizer %q not yet added", deletionFinalizerName) + }) + t.Log("Both finalizers are in place") + + // Simulate Helm deleting both resources simultaneously by deleting the + // TemporalConnection first, then the TWD. The connection should be blocked + // by the finalizer until the TWD cleanup removes it. + if err := k8sClient.Delete(ctx, temporalConnection); err != nil { + t.Fatalf("failed to delete TemporalConnection: %v", err) + } + t.Log("TemporalConnection deletion requested (blocked by finalizer)") + + // Verify the connection is NOT yet deleted (finalizer holds it) + var connCheck temporaliov1alpha1.TemporalConnection + if err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &connCheck); err != nil { + t.Fatalf("TemporalConnection should still exist (held by finalizer), but got: %v", err) + } + if connCheck.DeletionTimestamp.IsZero() { + t.Fatal("TemporalConnection should have DeletionTimestamp set") + } + t.Log("Verified: TemporalConnection is in Terminating state (held by finalizer)") + + // Now delete the TWD + var latestTwd temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &latestTwd); err != nil { + t.Fatalf("failed to get TWD: %v", err) + } + if err := k8sClient.Delete(ctx, &latestTwd); err != nil { + t.Fatalf("failed to delete TWD: %v", err) + } + + // Verify the TWD is eventually deleted + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check) + if err != nil { + return nil // deleted + } + return errors.New("TWD still exists") + }) + t.Log("TWD deleted successfully") + + // Verify the TemporalConnection is also eventually deleted + // (controller removed the finalizer during TWD cleanup, K8s can now delete it) + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalConnection + err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &check) + if err != nil { + return nil // deleted + } + return errors.New("TemporalConnection still exists after TWD cleanup") + }) + t.Log("TemporalConnection deleted successfully (finalizer was removed by TWD cleanup)") +} diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 53fe2dd6..07ba3045 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -949,6 +949,10 @@ func TestIntegration(t *testing.T) { // Conditions and events tests runConditionsAndEventsTests(t, k8sClient, mgr, ts, testNamespace.Name) + + // Deletion cleanup tests + runDeletionTests(t, k8sClient, mgr, ts, testNamespace.Name) + } // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status From 9fd0c740bfa25a367e8f5b194dd18675bf7a7890 Mon Sep 17 00:00:00 2001 From: Anuj Agrawal Date: Wed, 22 Apr 2026 23:37:22 +0530 Subject: [PATCH 6/6] fix: add deletion timeout, strict retry, and RBAC markers for connection finalizer - Add 5-minute deletionCleanupTimeout to prevent TWD stuck in Terminating state indefinitely if Temporal server is unavailable - Return errors from version/deployment deletion to trigger requeue until versions actually clear (pollers disappear as pods terminate) - Add update/patch verbs and finalizers RBAC marker for TemporalConnections - Fix comment-spacing lint on new kubebuilder:rbac markers --- internal/controller/worker_controller.go | 32 ++++++++++++++++-------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 63d940be..3ee60170 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -51,6 +51,12 @@ const ( // ensures Temporal server-side versioning data is cleaned up. On TemporalConnection // resources, it prevents deletion while any TWD still references the connection. finalizerName = "temporal.io/delete-protection" + + // deletionCleanupTimeout is the maximum duration to retry Temporal server-side + // cleanup before giving up and allowing the K8s resource to be deleted. + // This prevents the TWD from being stuck in Terminating state indefinitely + // if the Temporal server is unavailable or a version has persistent active pollers. + deletionCleanupTimeout = 5 * time.Minute ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -108,8 +114,8 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update -//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch -//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update +// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update @@ -151,8 +157,14 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) { l.Info("TemporalWorkerDeployment is being deleted, running cleanup") if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil { - l.Error(err, "failed to clean up Temporal server-side deployment data") - return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + elapsed := time.Since(workerDeploy.DeletionTimestamp.Time) + if elapsed < deletionCleanupTimeout { + l.Error(err, "failed to clean up Temporal server-side deployment data, will retry", + "elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + l.Error(err, "failed to clean up Temporal server-side deployment data after timeout, proceeding with finalizer removal", + "elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout) } // Remove our finalizer from the TemporalConnection if no other TWDs reference it. @@ -490,6 +502,9 @@ func (r *TemporalWorkerDeploymentReconciler) handleDeletion( // Step 3: Delete versions that are eligible. Versions that are still draining // are force-deleted with SkipDrainage since the TWD is being removed entirely. + // If any version fails to delete (e.g. active pollers), return an error so the + // reconciler requeues. Pollers disappear once pods terminate and the next + // reconciliation will succeed. for _, version := range resp.Info.VersionSummaries { buildID := version.Version.BuildID l.Info("Deleting worker deployment version", "buildID", buildID) @@ -498,20 +513,17 @@ func (r *TemporalWorkerDeploymentReconciler) handleDeletion( SkipDrainage: true, Identity: getControllerIdentity(), }); err != nil { - // Log but don't fail -- the version may still have pollers or be current. - // Temporal will garbage collect it eventually. - l.Info("Could not delete version (will be garbage collected)", "buildID", buildID, "error", err) + return fmt.Errorf("unable to delete version %s (will retry): %w", buildID, err) } } - // Step 4: Attempt to delete the deployment itself. This only succeeds if all versions are gone. + // Step 4: Delete the deployment itself. This only succeeds if all versions are gone. l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName) if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{ Name: workerDeploymentName, Identity: getControllerIdentity(), }); err != nil { - // Non-fatal: deployment will be garbage collected once all versions drain. - l.Info("Could not delete worker deployment (will be garbage collected)", "name", workerDeploymentName, "error", err) + return fmt.Errorf("unable to delete worker deployment %s (will retry): %w", workerDeploymentName, err) } return nil