Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions helm/temporal-worker-controller/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ rules:
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- temporal.io
resources:
- temporalconnections/finalizers
verbs:
- update
- apiGroups:
- temporal.io
resources:
Expand Down
263 changes: 262 additions & 1 deletion internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,6 +28,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -41,6 +45,18 @@ const (

// wrtWorkerRefKey is the field index key for WorkerResourceTemplate by temporalWorkerDeploymentRef.name.
wrtWorkerRefKey = ".spec.temporalWorkerDeploymentRef.name"

// finalizerName is the finalizer added to TemporalWorkerDeployment and TemporalConnection
// resources to prevent deletion before cleanup actions are taken. On TWD resources, it
// ensures Temporal server-side versioning data is cleaned up. On TemporalConnection
// resources, it prevents deletion while any TWD still references the connection.
finalizerName = "temporal.io/delete-protection"

// deletionCleanupTimeout is the maximum duration to retry Temporal server-side
// cleanup before giving up and allowing the K8s resource to be deleted.
// This prevents the TWD from being stuck in Terminating state indefinitely
// if the Temporal server is unavailable or a version has persistent active pollers.
deletionCleanupTimeout = 5 * time.Minute
Comment on lines +55 to +59
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the goal of this finalizer is for the kubernets / TemporalWorkerDeployment perspective and the Temporal server perspective to be aligned. So if the server-side object was created by creating the k8s-side object, the server-side object should also be deleted in the same way.

This timeout breaks that expectation. If the server is temporarily unavailable and this finalizer gives up and deletes the k8s-side object, the server-side object would never be deleted.

I'm curious if you ran into this while testing? The default TTL for "active pollers" in server is 5 minutes, so if when the TWD enters Terminating state it is running active pollers, the controller needs to kill those Deployments and then wait 5 minutes before the "no active pollers" check passes. If all versions were Drained and the pods had been scaled down for a while, this delay wouldn't exist.

Because of that 5 minute poller TTL, a 5 minute deletionCleanupTimeout would frequently be used in case of deletion before natural scaledown, which would IMO not be good (because of the leftover server-side object thing I explained above).

If we decide to keep this, I would advocate for:

  • A very long threshold, like 1h
  • Only timing out on unavailable errors from the server, not precondition failed (which is the "active pollers" thing)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will defer to @jaypipes opinion here though!

)

// getAPIKeySecretName extracts the secret name from a SecretKeySelector
Expand Down Expand Up @@ -98,7 +114,8 @@ type TemporalWorkerDeploymentReconciler struct {
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update
//+kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update
//+kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update
Expand Down Expand Up @@ -133,6 +150,47 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, r.markWRTsTWDNotFound(ctx, req.NamespacedName)
}

// Handle deletion: clean up Temporal server-side versioning data before allowing
// the CRD to be deleted. Without this, stale build ID routing persists in Temporal
// and prevents unversioned workers from picking up tasks on the same task queue.
if !workerDeploy.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
l.Info("TemporalWorkerDeployment is being deleted, running cleanup")
if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil {
elapsed := time.Since(workerDeploy.DeletionTimestamp.Time)
if elapsed < deletionCleanupTimeout {
l.Error(err, "failed to clean up Temporal server-side deployment data, will retry",
"elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
l.Error(err, "failed to clean up Temporal server-side deployment data after timeout, proceeding with finalizer removal",
"elapsed", elapsed.Round(time.Second), "timeout", deletionCleanupTimeout)
}

// Remove our finalizer from the TemporalConnection if no other TWDs reference it.
if err := r.removeConnectionFinalizerIfUnused(ctx, l, &workerDeploy); err != nil {
l.Error(err, "failed to remove finalizer from TemporalConnection")
return ctrl.Result{}, err
}

// Cleanup succeeded, remove the finalizer so K8s can delete the resource
controllerutil.RemoveFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
l.Info("Temporal server-side cleanup complete, finalizer removed")
}
return ctrl.Result{}, nil
}

// Ensure finalizer is present on non-deleted resources
if !controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
controllerutil.AddFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
}

// TODO(jlegrone): Set defaults via webhook rather than manually
if err := workerDeploy.Default(ctx, &workerDeploy); err != nil {
l.Error(err, "TemporalWorkerDeployment defaulter failed")
Expand Down Expand Up @@ -164,6 +222,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, err
}

// Ensure our finalizer is on the TemporalConnection so it cannot be deleted
// while this TWD still references it. This guarantees the connection is available
// during TWD deletion cleanup.
if err := r.ensureConnectionFinalizer(ctx, l, &temporalConnection); err != nil {
return ctrl.Result{}, err
}

// Get the Auth Mode and Secret Name
authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
Expand Down Expand Up @@ -336,6 +401,134 @@ func (r *TemporalWorkerDeploymentReconciler) markWRTsTWDNotFound(ctx context.Con
return errors.Join(errs...)
}

// handleDeletion cleans up Temporal server-side deployment versioning data when
// a TemporalWorkerDeployment CRD is deleted. This prevents stale build ID routing
// from blocking unversioned workers on the same task queue.
//
// The cleanup sequence:
// 1. Set the current version to "unversioned" (empty BuildID) so new tasks route to unversioned workers
// 2. Delete all non-current/non-ramping versions (drained/inactive ones)
// 3. The deployment itself will be garbage collected by Temporal once all versions are removed
func (r *TemporalWorkerDeploymentReconciler) handleDeletion(
ctx context.Context,
l logr.Logger,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
// Resolve Temporal connection.
// The TemporalConnection is guaranteed to exist because we hold a finalizer on it
// that prevents deletion while any TWD references it.
var temporalConnection temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name,
Namespace: workerDeploy.Namespace,
}, &temporalConnection); err != nil {
return fmt.Errorf("unable to fetch TemporalConnection: %w", err)
}

authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
return fmt.Errorf("unable to resolve auth secret name: %w", err)
}

temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
SecretName: secretName,
AuthMode: authMode,
})
if !ok {
clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{
K8sNamespace: workerDeploy.Namespace,
TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
Spec: temporalConnection.Spec,
Identity: getControllerIdentity(),
})
if err != nil {
return fmt.Errorf("unable to parse Temporal auth secret: %w", err)
}
c, err := r.TemporalClientPool.DialAndUpsertClient(*clientOpts, *key, *clientAuth)
if err != nil {
return fmt.Errorf("unable to create TemporalClient: %w", err)
}
temporalClient = c
}

workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName)

// Describe the deployment to get current state
resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{})
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
l.Info("Worker Deployment not found on Temporal server, nothing to clean up")
return nil
}
return fmt.Errorf("unable to describe worker deployment: %w", err)
}

routingConfig := resp.Info.RoutingConfig

// Step 1: Set current version to unversioned (empty BuildID) so tasks route to unversioned workers.
// This is the critical step that unblocks task dispatch.
if routingConfig.CurrentVersion != nil {
l.Info("Setting current version to unversioned", "previousBuildID", routingConfig.CurrentVersion.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: "", // empty = unversioned
ConflictToken: resp.ConflictToken,
Identity: getControllerIdentity(),
IgnoreMissingTaskQueues: true,
}); err != nil {
return fmt.Errorf("unable to set current version to unversioned: %w", err)
}
l.Info("Successfully set current version to unversioned")
} else {
l.Info("No current version set, skipping unversioned redirect")
}

// Step 2: If there's a ramping version, clear it.
if routingConfig.RampingVersion != nil {
l.Info("Clearing ramping version", "buildID", routingConfig.RampingVersion.BuildID)
if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
BuildID: "",
Percentage: 0,
Identity: getControllerIdentity(),
}); err != nil {
l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err)
}
} else {
l.Info("No ramping version set, skipping clear ramping version")
}
Comment thread
anujagrawal380 marked this conversation as resolved.

// Step 3: Delete versions that are eligible. Versions that are still draining
// are force-deleted with SkipDrainage since the TWD is being removed entirely.
// If any version fails to delete (e.g. active pollers), return an error so the
// reconciler requeues. Pollers disappear once pods terminate and the next
// reconciliation will succeed.
for _, version := range resp.Info.VersionSummaries {
buildID := version.Version.BuildID
l.Info("Deleting worker deployment version", "buildID", buildID)
if _, err := deploymentHandler.DeleteVersion(ctx, sdkclient.WorkerDeploymentDeleteVersionOptions{
BuildID: buildID,
SkipDrainage: true,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to delete version %s (will retry): %w", buildID, err)
}
}

// Step 4: Delete the deployment itself. This only succeeds if all versions are gone.
l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName)
if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{
Name: workerDeploymentName,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to delete worker deployment %s (will retry): %w", workerDeploymentName, err)
}

return nil
}

// setCondition sets a condition on the TemporalWorkerDeployment status.
func (r *TemporalWorkerDeploymentReconciler) setCondition(
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
Expand Down Expand Up @@ -423,6 +616,74 @@ func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetBlocked(
_ = r.Status().Update(ctx, workerDeploy)
}

// ensureConnectionFinalizer adds our finalizer to the TemporalConnection so it
// cannot be deleted while this TWD still needs it for cleanup.
func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer(
ctx context.Context,
l logr.Logger,
tc *temporaliov1alpha1.TemporalConnection,
) error {
if !controllerutil.ContainsFinalizer(tc, finalizerName) {
l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name)
controllerutil.AddFinalizer(tc, finalizerName)
if err := r.Update(ctx, tc); err != nil {
return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err)
}
}
return nil
}

// removeConnectionFinalizerIfUnused removes our finalizer from the TemporalConnection
// if no other TWDs (besides the one being deleted) still reference it.
func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused(
ctx context.Context,
l logr.Logger,
deletingTWD *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
connectionName := deletingTWD.Spec.WorkerOptions.TemporalConnectionRef.Name

// List all TWDs in the same namespace
var twds temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &twds, client.InNamespace(deletingTWD.Namespace)); err != nil {
return fmt.Errorf("unable to list TWDs: %w", err)
}

// Check if any other TWD (not the one being deleted) references this connection
for i := range twds.Items {
twd := &twds.Items[i]
if twd.Name == deletingTWD.Name {
continue
}
if twd.Spec.WorkerOptions.TemporalConnectionRef.Name == connectionName {
l.Info("TemporalConnection still referenced by another TWD, keeping finalizer",
"connection", connectionName, "referencedBy", twd.Name)
return nil
}
}

// No other TWDs reference this connection, remove the finalizer
var tc temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: connectionName,
Namespace: deletingTWD.Namespace,
}, &tc); err != nil {
if apierrors.IsNotFound(err) {
return nil // already gone
}
return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err)
}

if controllerutil.ContainsFinalizer(&tc, finalizerName) {
l.Info("Removing finalizer from TemporalConnection", "connection", connectionName)
controllerutil.RemoveFinalizer(&tc, finalizerName)
if err := r.Update(ctx, &tc); err != nil {
return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err)
}
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string {
Expand Down
Loading
Loading