diff --git a/helm/temporal-worker-controller/templates/rbac.yaml b/helm/temporal-worker-controller/templates/rbac.yaml index b73faca5..7175029a 100644 --- a/helm/temporal-worker-controller/templates/rbac.yaml +++ b/helm/temporal-worker-controller/templates/rbac.yaml @@ -96,7 +96,15 @@ rules: verbs: - get - list + - patch + - update - watch + - apiGroups: + - temporal.io + resources: + - temporalconnections/finalizers + verbs: + - update - apiGroups: - temporal.io resources: diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 25f2f533..3ee60170 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -10,10 +10,13 @@ import ( "fmt" "time" + "github.com/go-logr/logr" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/temporal" + "go.temporal.io/api/serviceerror" + sdkclient "go.temporal.io/sdk/client" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +28,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -41,6 +45,18 @@ const ( // wrtWorkerRefKey is the field index key for WorkerResourceTemplate by temporalWorkerDeploymentRef.name. wrtWorkerRefKey = ".spec.temporalWorkerDeploymentRef.name" + + // finalizerName is the finalizer added to TemporalWorkerDeployment and TemporalConnection + // resources to prevent deletion before cleanup actions are taken. On TWD resources, it + // ensures Temporal server-side versioning data is cleaned up. On TemporalConnection + // resources, it prevents deletion while any TWD still references the connection. + finalizerName = "temporal.io/delete-protection" + + // deletionCleanupTimeout is the maximum duration to retry Temporal server-side + // cleanup before giving up and allowing the K8s resource to be deleted. + // This prevents the TWD from being stuck in Terminating state indefinitely + // if the Temporal server is unavailable or a version has persistent active pollers. + deletionCleanupTimeout = 5 * time.Minute ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -98,7 +114,8 @@ type TemporalWorkerDeploymentReconciler struct { //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update -//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch +// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch +// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update //+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update @@ -133,6 +150,47 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, r.markWRTsTWDNotFound(ctx, req.NamespacedName) } + // Handle deletion: clean up Temporal server-side versioning data before allowing + // the CRD to be deleted. Without this, stale build ID routing persists in Temporal + // and prevents unversioned workers from picking up tasks on the same task queue. + if !workerDeploy.DeletionTimestamp.IsZero() { + if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) { + l.Info("TemporalWorkerDeployment is being deleted, running cleanup") + if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil { + elapsed := time.Since(workerDeploy.DeletionTimestamp.Time) + if elapsed < deletionCleanupTimeout { + l.Error(err, "failed to clean up Temporal server-side deployment data, will retry", + "elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout) + return ctrl.Result{RequeueAfter: 10 * time.Second}, nil + } + l.Error(err, "failed to clean up Temporal server-side deployment data after timeout, proceeding with finalizer removal", + "elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout) + } + + // Remove our finalizer from the TemporalConnection if no other TWDs reference it. + if err := r.removeConnectionFinalizerIfUnused(ctx, l, &workerDeploy); err != nil { + l.Error(err, "failed to remove finalizer from TemporalConnection") + return ctrl.Result{}, err + } + + // Cleanup succeeded, remove the finalizer so K8s can delete the resource + controllerutil.RemoveFinalizer(&workerDeploy, finalizerName) + if err := r.Update(ctx, &workerDeploy); err != nil { + return ctrl.Result{}, err + } + l.Info("Temporal server-side cleanup complete, finalizer removed") + } + return ctrl.Result{}, nil + } + + // Ensure finalizer is present on non-deleted resources + if !controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) { + controllerutil.AddFinalizer(&workerDeploy, finalizerName) + if err := r.Update(ctx, &workerDeploy); err != nil { + return ctrl.Result{}, err + } + } + // TODO(jlegrone): Set defaults via webhook rather than manually if err := workerDeploy.Default(ctx, &workerDeploy); err != nil { l.Error(err, "TemporalWorkerDeployment defaulter failed") @@ -164,6 +222,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req return ctrl.Result{}, err } + // Ensure our finalizer is on the TemporalConnection so it cannot be deleted + // while this TWD still references it. This guarantees the connection is available + // during TWD deletion cleanup. + if err := r.ensureConnectionFinalizer(ctx, l, &temporalConnection); err != nil { + return ctrl.Result{}, err + } + // Get the Auth Mode and Secret Name authMode, secretName, err := resolveAuthSecretName(&temporalConnection) if err != nil { @@ -336,6 +401,134 @@ func (r *TemporalWorkerDeploymentReconciler) markWRTsTWDNotFound(ctx context.Con return errors.Join(errs...) } +// handleDeletion cleans up Temporal server-side deployment versioning data when +// a TemporalWorkerDeployment CRD is deleted. This prevents stale build ID routing +// from blocking unversioned workers on the same task queue. +// +// The cleanup sequence: +// 1. Set the current version to "unversioned" (empty BuildID) so new tasks route to unversioned workers +// 2. Delete all non-current/non-ramping versions (drained/inactive ones) +// 3. The deployment itself will be garbage collected by Temporal once all versions are removed +func (r *TemporalWorkerDeploymentReconciler) handleDeletion( + ctx context.Context, + l logr.Logger, + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + // Resolve Temporal connection. + // The TemporalConnection is guaranteed to exist because we hold a finalizer on it + // that prevents deletion while any TWD references it. + var temporalConnection temporaliov1alpha1.TemporalConnection + if err := r.Get(ctx, types.NamespacedName{ + Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: workerDeploy.Namespace, + }, &temporalConnection); err != nil { + return fmt.Errorf("unable to fetch TemporalConnection: %w", err) + } + + authMode, secretName, err := resolveAuthSecretName(&temporalConnection) + if err != nil { + return fmt.Errorf("unable to resolve auth secret name: %w", err) + } + + temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{ + HostPort: temporalConnection.Spec.HostPort, + Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, + SecretName: secretName, + AuthMode: authMode, + }) + if !ok { + clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{ + K8sNamespace: workerDeploy.Namespace, + TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, + Spec: temporalConnection.Spec, + Identity: getControllerIdentity(), + }) + if err != nil { + return fmt.Errorf("unable to parse Temporal auth secret: %w", err) + } + c, err := r.TemporalClientPool.DialAndUpsertClient(*clientOpts, *key, *clientAuth) + if err != nil { + return fmt.Errorf("unable to create TemporalClient: %w", err) + } + temporalClient = c + } + + workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy) + deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName) + + // Describe the deployment to get current state + resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + l.Info("Worker Deployment not found on Temporal server, nothing to clean up") + return nil + } + return fmt.Errorf("unable to describe worker deployment: %w", err) + } + + routingConfig := resp.Info.RoutingConfig + + // Step 1: Set current version to unversioned (empty BuildID) so tasks route to unversioned workers. + // This is the critical step that unblocks task dispatch. + if routingConfig.CurrentVersion != nil { + l.Info("Setting current version to unversioned", "previousBuildID", routingConfig.CurrentVersion.BuildID) + if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: "", // empty = unversioned + ConflictToken: resp.ConflictToken, + Identity: getControllerIdentity(), + IgnoreMissingTaskQueues: true, + }); err != nil { + return fmt.Errorf("unable to set current version to unversioned: %w", err) + } + l.Info("Successfully set current version to unversioned") + } else { + l.Info("No current version set, skipping unversioned redirect") + } + + // Step 2: If there's a ramping version, clear it. + if routingConfig.RampingVersion != nil { + l.Info("Clearing ramping version", "buildID", routingConfig.RampingVersion.BuildID) + if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ + BuildID: "", + Percentage: 0, + Identity: getControllerIdentity(), + }); err != nil { + l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err) + } + } else { + l.Info("No ramping version set, skipping clear ramping version") + } + + // Step 3: Delete versions that are eligible. Versions that are still draining + // are force-deleted with SkipDrainage since the TWD is being removed entirely. + // If any version fails to delete (e.g. active pollers), return an error so the + // reconciler requeues. Pollers disappear once pods terminate and the next + // reconciliation will succeed. + for _, version := range resp.Info.VersionSummaries { + buildID := version.Version.BuildID + l.Info("Deleting worker deployment version", "buildID", buildID) + if _, err := deploymentHandler.DeleteVersion(ctx, sdkclient.WorkerDeploymentDeleteVersionOptions{ + BuildID: buildID, + SkipDrainage: true, + Identity: getControllerIdentity(), + }); err != nil { + return fmt.Errorf("unable to delete version %s (will retry): %w", buildID, err) + } + } + + // Step 4: Delete the deployment itself. This only succeeds if all versions are gone. + l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName) + if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{ + Name: workerDeploymentName, + Identity: getControllerIdentity(), + }); err != nil { + return fmt.Errorf("unable to delete worker deployment %s (will retry): %w", workerDeploymentName, err) + } + + return nil +} + // setCondition sets a condition on the TemporalWorkerDeployment status. func (r *TemporalWorkerDeploymentReconciler) setCondition( workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, @@ -423,6 +616,74 @@ func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetBlocked( _ = r.Status().Update(ctx, workerDeploy) } +// ensureConnectionFinalizer adds our finalizer to the TemporalConnection so it +// cannot be deleted while this TWD still needs it for cleanup. +func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer( + ctx context.Context, + l logr.Logger, + tc *temporaliov1alpha1.TemporalConnection, +) error { + if !controllerutil.ContainsFinalizer(tc, finalizerName) { + l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name) + controllerutil.AddFinalizer(tc, finalizerName) + if err := r.Update(ctx, tc); err != nil { + return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err) + } + } + return nil +} + +// removeConnectionFinalizerIfUnused removes our finalizer from the TemporalConnection +// if no other TWDs (besides the one being deleted) still reference it. +func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused( + ctx context.Context, + l logr.Logger, + deletingTWD *temporaliov1alpha1.TemporalWorkerDeployment, +) error { + connectionName := deletingTWD.Spec.WorkerOptions.TemporalConnectionRef.Name + + // List all TWDs in the same namespace + var twds temporaliov1alpha1.TemporalWorkerDeploymentList + if err := r.List(ctx, &twds, client.InNamespace(deletingTWD.Namespace)); err != nil { + return fmt.Errorf("unable to list TWDs: %w", err) + } + + // Check if any other TWD (not the one being deleted) references this connection + for i := range twds.Items { + twd := &twds.Items[i] + if twd.Name == deletingTWD.Name { + continue + } + if twd.Spec.WorkerOptions.TemporalConnectionRef.Name == connectionName { + l.Info("TemporalConnection still referenced by another TWD, keeping finalizer", + "connection", connectionName, "referencedBy", twd.Name) + return nil + } + } + + // No other TWDs reference this connection, remove the finalizer + var tc temporaliov1alpha1.TemporalConnection + if err := r.Get(ctx, types.NamespacedName{ + Name: connectionName, + Namespace: deletingTWD.Namespace, + }, &tc); err != nil { + if apierrors.IsNotFound(err) { + return nil // already gone + } + return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err) + } + + if controllerutil.ContainsFinalizer(&tc, finalizerName) { + l.Info("Removing finalizer from TemporalConnection", "connection", connectionName) + controllerutil.RemoveFinalizer(&tc, finalizerName) + if err := r.Update(ctx, &tc); err != nil { + return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err) + } + } + + return nil +} + // SetupWithManager sets up the controller with the Manager. func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error { if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string { diff --git a/internal/tests/internal/deletion_integration_test.go b/internal/tests/internal/deletion_integration_test.go new file mode 100644 index 00000000..05c35cbd --- /dev/null +++ b/internal/tests/internal/deletion_integration_test.go @@ -0,0 +1,314 @@ +package internal + +// Tests that deleting a TemporalWorkerDeployment CRD correctly cleans up +// Temporal server-side versioning data and handles edge cases like the +// TemporalConnection being deleted simultaneously by Helm. +// +// Covered: +// - TWD deletion sets current version to unversioned on Temporal server +// - TWD deletion removes finalizer from TemporalConnection when no other TWDs reference it +// - TWD is fully deleted from K8s after cleanup (finalizer removed) +// - TWD deletion with TemporalConnection deleted simultaneously (Helm race condition) still succeeds + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/k8s" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" + "go.temporal.io/api/serviceerror" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/server/temporaltest" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const deletionFinalizerName = "temporal.io/delete-protection" + +func runDeletionTests( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + testNamespace string, +) { + t.Run("deletion-sets-current-to-unversioned", func(t *testing.T) { + testDeletionSetsCurrentToUnversioned(t, k8sClient, mgr, ts, testNamespace) + }) + + t.Run("deletion-removes-connection-finalizer", func(t *testing.T) { + testDeletionRemovesConnectionFinalizer(t, k8sClient, mgr, ts, testNamespace) + }) +} + +// testDeletionSetsCurrentToUnversioned verifies the core fix: when a TWD is deleted, +// the controller sets the current version to unversioned so tasks route to unversioned workers, +// and the TWD is fully deleted from K8s. +func testDeletionSetsCurrentToUnversioned( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + namespace string, +) { + ctx := context.Background() + testName := "del-cleanup" + + // Build a TWD using the standard builder pattern + tc := testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1.0", true, false), + ). + BuildWithValues(testName, namespace, ts.GetDefaultNamespace()) + + twd := tc.GetTWD() + + // Create a TemporalConnection + temporalConnection := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: twd.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: ts.GetFrontendHostPort(), + }, + } + if err := k8sClient.Create(ctx, temporalConnection); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + + // Create the TWD + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + // Wait for the child deployment to be created by the controller + workerDeploymentName := k8s.ComputeWorkerDeploymentName(twd) + buildID := k8s.ComputeBuildID(twd) + expectedDeploymentName := k8s.ComputeVersionedDeploymentName(twd.Name, buildID) + + eventually(t, 30*time.Second, time.Second, func() error { + var dep appsv1.Deployment + return k8sClient.Get(ctx, types.NamespacedName{ + Name: expectedDeploymentName, Namespace: namespace, + }, &dep) + }) + + // Start workers so the version registers on the Temporal server + env := testhelpers.TestEnv{ + K8sClient: k8sClient, + Mgr: mgr, + Ts: ts, + Connection: temporalConnection, + } + workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, namespace) + defer handleStopFuncs(workerStopFuncs) + + // Wait until the version becomes current on the Temporal server + deploymentHandle := ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) + eventually(t, 60*time.Second, 2*time.Second, func() error { + resp, err := deploymentHandle.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + return err + } + if resp.Info.RoutingConfig.CurrentVersion == nil { + return errors.New("current version not set yet") + } + return nil + }) + t.Log("TWD is reconciled with a current version set") + + // Verify the TWD has our finalizer + var twdBeforeDelete temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &twdBeforeDelete); err != nil { + t.Fatalf("failed to get TWD: %v", err) + } + hasFinalizer := false + for _, f := range twdBeforeDelete.Finalizers { + if f == deletionFinalizerName { + hasFinalizer = true + break + } + } + if !hasFinalizer { + t.Fatalf("TWD does not have expected finalizer %q", deletionFinalizerName) + } + + // Delete the TWD + t.Log("Deleting the TemporalWorkerDeployment") + if err := k8sClient.Delete(ctx, &twdBeforeDelete); err != nil { + t.Fatalf("failed to delete TWD: %v", err) + } + + // Verify the TWD is eventually deleted (finalizer ran and was removed) + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check) + if err != nil { + return nil // not found = deleted + } + return errors.New("TWD still exists, finalizer may not have completed") + }) + t.Log("TWD deleted successfully (finalizer completed)") + + // Verify Temporal server-side state: current version should be unversioned + resp, err := deploymentHandle.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{}) + if err != nil { + var notFound *serviceerror.NotFound + if errors.As(err, ¬Found) { + t.Log("Worker Deployment was fully deleted from Temporal server") + return + } + t.Fatalf("failed to describe worker deployment after deletion: %v", err) + } + + if resp.Info.RoutingConfig.CurrentVersion != nil { + t.Errorf("expected current version to be nil (unversioned) after TWD deletion, got buildID=%q", + resp.Info.RoutingConfig.CurrentVersion.BuildID) + } else { + t.Log("Verified: current version is unversioned after TWD deletion") + } + + // Suppress unused variable warning for env + _ = env +} + +// testDeletionRemovesConnectionFinalizer verifies that when a TWD is deleted, +// the controller removes its finalizer from the TemporalConnection, allowing +// the connection to be deleted by K8s. This tests the Helm race condition fix. +func testDeletionRemovesConnectionFinalizer( + t *testing.T, + k8sClient client.Client, + mgr manager.Manager, + ts *temporaltest.TestServer, + namespace string, +) { + ctx := context.Background() + testName := "del-conn-finalizer" + + // Build a TWD with manual strategy (simpler, no need to reach current version) + tc := testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithManualStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusInactive, -1, true, false), + ). + BuildWithValues(testName, namespace, ts.GetDefaultNamespace()) + + twd := tc.GetTWD() + + // Create a TemporalConnection + temporalConnection := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: twd.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: ts.GetFrontendHostPort(), + }, + } + if err := k8sClient.Create(ctx, temporalConnection); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + + // Create the TWD + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TWD: %v", err) + } + + // Wait for the finalizer to be added to both TWD and TemporalConnection + eventually(t, 30*time.Second, time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check); err != nil { + return err + } + for _, f := range check.Finalizers { + if f == deletionFinalizerName { + return nil + } + } + return fmt.Errorf("TWD finalizer %q not yet added", deletionFinalizerName) + }) + + eventually(t, 30*time.Second, time.Second, func() error { + var check temporaliov1alpha1.TemporalConnection + if err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &check); err != nil { + return err + } + for _, f := range check.Finalizers { + if f == deletionFinalizerName { + return nil + } + } + return fmt.Errorf("TemporalConnection finalizer %q not yet added", deletionFinalizerName) + }) + t.Log("Both finalizers are in place") + + // Simulate Helm deleting both resources simultaneously by deleting the + // TemporalConnection first, then the TWD. The connection should be blocked + // by the finalizer until the TWD cleanup removes it. + if err := k8sClient.Delete(ctx, temporalConnection); err != nil { + t.Fatalf("failed to delete TemporalConnection: %v", err) + } + t.Log("TemporalConnection deletion requested (blocked by finalizer)") + + // Verify the connection is NOT yet deleted (finalizer holds it) + var connCheck temporaliov1alpha1.TemporalConnection + if err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &connCheck); err != nil { + t.Fatalf("TemporalConnection should still exist (held by finalizer), but got: %v", err) + } + if connCheck.DeletionTimestamp.IsZero() { + t.Fatal("TemporalConnection should have DeletionTimestamp set") + } + t.Log("Verified: TemporalConnection is in Terminating state (held by finalizer)") + + // Now delete the TWD + var latestTwd temporaliov1alpha1.TemporalWorkerDeployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &latestTwd); err != nil { + t.Fatalf("failed to get TWD: %v", err) + } + if err := k8sClient.Delete(ctx, &latestTwd); err != nil { + t.Fatalf("failed to delete TWD: %v", err) + } + + // Verify the TWD is eventually deleted + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalWorkerDeployment + err := k8sClient.Get(ctx, types.NamespacedName{Name: twd.Name, Namespace: namespace}, &check) + if err != nil { + return nil // deleted + } + return errors.New("TWD still exists") + }) + t.Log("TWD deleted successfully") + + // Verify the TemporalConnection is also eventually deleted + // (controller removed the finalizer during TWD cleanup, K8s can now delete it) + eventually(t, 60*time.Second, 2*time.Second, func() error { + var check temporaliov1alpha1.TemporalConnection + err := k8sClient.Get(ctx, types.NamespacedName{Name: temporalConnection.Name, Namespace: namespace}, &check) + if err != nil { + return nil // deleted + } + return errors.New("TemporalConnection still exists after TWD cleanup") + }) + t.Log("TemporalConnection deleted successfully (finalizer was removed by TWD cleanup)") +} diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 53fe2dd6..07ba3045 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -949,6 +949,10 @@ func TestIntegration(t *testing.T) { // Conditions and events tests runConditionsAndEventsTests(t, k8sClient, mgr, ts, testNamespace.Name) + + // Deletion cleanup tests + runDeletionTests(t, k8sClient, mgr, ts, testNamespace.Name) + } // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status