diff --git a/api/v1alpha1/temporalworker_webhook.go b/api/v1alpha1/temporalworker_webhook.go index 6772fc8b..869f7569 100644 --- a/api/v1alpha1/temporalworker_webhook.go +++ b/api/v1alpha1/temporalworker_webhook.go @@ -57,6 +57,14 @@ func (s *TemporalWorkerDeploymentSpec) Default(ctx context.Context) error { s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay} } + if s.RollbackStrategy == nil { + s.RollbackStrategy = &RollbackStrategy{Strategy: RollbackAllAtOnce} + } else if s.RollbackStrategy.Strategy == "" { + s.RollbackStrategy.Strategy = RollbackAllAtOnce + } + if s.RollbackStrategy.MaxVersionAge == nil { + s.RollbackStrategy.MaxVersionAge = &v1.Duration{Duration: defaults.RollbackMaxVersionAge} + } return nil } @@ -94,41 +102,20 @@ func validateForUpdateOrCreate(old, new *TemporalWorkerDeployment) (admission.Wa } allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...) + allErrs = append(allErrs, validateRollbackStrategy(*new.Spec.RollbackStrategy)...) if len(allErrs) > 0 { return nil, newInvalidErr(new, allErrs) } - return nil, nil + return warnRollbackSlowerThanRollout(new.Spec.RolloutStrategy, *new.Spec.RollbackStrategy), nil } func validateRolloutStrategy(s RolloutStrategy) []*field.Error { var allErrs []*field.Error if s.Strategy == UpdateProgressive { - rolloutSteps := s.Steps - if len(rolloutSteps) == 0 { - allErrs = append(allErrs, - field.Invalid(field.NewPath("spec.rollout.steps"), rolloutSteps, "steps are required for Progressive rollout"), - ) - } - var lastRamp int - for i, s := range rolloutSteps { - // Check duration >= 30s - if s.PauseDuration.Duration < 30*time.Second { - allErrs = append(allErrs, - field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].pauseDuration", i)), s.PauseDuration.Duration.String(), "pause duration must be at least 30s"), - ) - } - - // Check ramp value greater than last - if s.RampPercentage <= lastRamp { - allErrs = append(allErrs, - field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), s.RampPercentage, "rampPercentage must increase between each step"), - ) - } - lastRamp = s.RampPercentage - } + allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollout.steps", s.Steps)...) } // Validate gate input fields @@ -155,6 +142,70 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error { return allErrs } +func validateRollbackStrategy(s RollbackStrategy) []*field.Error { + var allErrs []*field.Error + if s.Strategy == RollbackProgressive { + allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollback.steps", s.Steps)...) + } + return allErrs +} + +func warnRollbackSlowerThanRollout(rollout RolloutStrategy, rollback RollbackStrategy) admission.Warnings { + switch rollout.Strategy { + case UpdateAllAtOnce: + if rollback.Strategy != RollbackAllAtOnce { + return admission.Warnings{"rollback strategy is slower than rollout: rollout is AllAtOnce, but rollback is Progressive — is that intended?"} + } + case UpdateProgressive: + if rollback.Strategy == RollbackProgressive { + var rolloutTotal, rollbackTotal time.Duration + for _, s := range rollout.Steps { + rolloutTotal += s.PauseDuration.Duration + } + for _, s := range rollback.Steps { + rollbackTotal += s.PauseDuration.Duration + } + if rollbackTotal > rolloutTotal { + return admission.Warnings{fmt.Sprintf( + "rollback strategy is slower than rollout: progressive rollback total duration (%s) exceeds progressive rollout total duration (%s) — is that intended?", + rollbackTotal, rolloutTotal, + )} + } + } + } + return nil +} + +func validateProgressiveStrategySteps(specName string, steps []RolloutStep) []*field.Error { + var allErrs []*field.Error + + if len(steps) == 0 { + allErrs = append(allErrs, + field.Invalid(field.NewPath(specName), steps, "steps are required for Progressive strategy"), + ) + } + + var lastRamp int + for i, step := range steps { + // Check duration >= 30s + if step.PauseDuration.Duration < 30*time.Second { + allErrs = append(allErrs, + field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].pauseDuration", specName, i)), step.PauseDuration.Duration.String(), "pause duration must be at least 30s"), + ) + } + + // Check ramp value greater than last + if step.RampPercentage <= lastRamp { + allErrs = append(allErrs, + field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].rampPercentage", specName, i)), step.RampPercentage, "rampPercentage must increase between each step"), + ) + } + lastRamp = step.RampPercentage + } + + return allErrs +} + func newInvalidErr(dep *TemporalWorkerDeployment, errs field.ErrorList) *apierrors.StatusError { return apierrors.NewInvalid(dep.GroupVersionKind().GroupKind(), dep.GetName(), errs) } diff --git a/api/v1alpha1/temporalworker_webhook_test.go b/api/v1alpha1/temporalworker_webhook_test.go index 15b48915..3f6b4f78 100644 --- a/api/v1alpha1/temporalworker_webhook_test.go +++ b/api/v1alpha1/temporalworker_webhook_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/defaults" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,8 +24,9 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { tests := map[string]struct { obj runtime.Object errorMsg string + warnMsg string }{ - "valid temporal worker deployment": { + "valid default temporal worker deployment": { obj: testhelpers.MakeTWDWithName("valid-worker", ""), }, "temporal worker deployment with name too long": { @@ -39,15 +41,15 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { }, errorMsg: "expected a TemporalWorkerDeployment", }, - "missing rollout steps": { + "rollout strategy - invalid Progressive without steps": { obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-missing-steps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive obj.Spec.RolloutStrategy.Steps = nil return obj }), - errorMsg: "spec.rollout.steps: Invalid value: null: steps are required for Progressive rollout", + errorMsg: "spec.rollout.steps: Invalid value: null: steps are required for Progressive strategy", }, - "ramp value for step <= previous step": { + "rollout strategy - invalid Progressive with non-increasing ramp": { obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{ @@ -62,7 +64,7 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { }), errorMsg: "[spec.rollout.steps[2].rampPercentage: Invalid value: 9: rampPercentage must increase between each step, spec.rollout.steps[4].rampPercentage: Invalid value: 50: rampPercentage must increase between each step]", }, - "pause duration < 30s": { + "rollout strategy - invalid Progressive pause duration < 30s": { obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("prog-rollout-decreasing-ramps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { obj.Spec.RolloutStrategy.Strategy = temporaliov1alpha1.UpdateProgressive obj.Spec.RolloutStrategy.Steps = []temporaliov1alpha1.RolloutStep{ @@ -74,6 +76,107 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { }), errorMsg: `spec.rollout.steps[1].pauseDuration: Invalid value: "10s": pause duration must be at least 30s`, }, + "rollback strategy - Progressive rollback with AllAtOnce rollout warns": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-progressive", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: 30 * time.Second}}, + }, + } + return obj + }), + warnMsg: "rollback strategy is slower than rollout", + }, + "rollback strategy - AllAtOnce rollback with Progressive rollout is valid": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-allat-once-prog-rollout", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: time.Minute}}, + {75, metav1.Duration{Duration: time.Minute}}, + }, + } + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + } + return obj + }), + }, + "rollback strategy - Progressive rollback faster than Progressive rollout is valid": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-prog-faster", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: 2 * time.Minute}}, + {75, metav1.Duration{Duration: 2 * time.Minute}}, + }, + } + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: time.Minute}}, + {75, metav1.Duration{Duration: time.Minute}}, + }, + } + return obj + }), + }, + "rollback strategy - Progressive rollback slower than Progressive rollout warns": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-prog-slower", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RolloutStrategy = temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: time.Minute}}, + {75, metav1.Duration{Duration: time.Minute}}, + }, + } + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: 2 * time.Minute}}, + {75, metav1.Duration{Duration: 2 * time.Minute}}, + }, + } + return obj + }), + warnMsg: "rollback strategy is slower than rollout", + }, + "rollback strategy - invalid Progressive without steps": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-progressive-no-steps", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: nil, + } + return obj + }), + errorMsg: "steps are required for Progressive strategy", + }, + "rollback strategy - invalid Progressive pause duration < 30s": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-progressive-invalid", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: 10 * time.Second}}, + }, + } + return obj + }), + errorMsg: "pause duration must be at least 30s", + }, + "rollback strategy - invalid Progressive with non-increasing ramp": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("rollback-progressive-decreasing", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: time.Minute}}, + {25, metav1.Duration{Duration: time.Minute}}, + }, + } + return obj + }), + errorMsg: "rampPercentage must increase between each step", + }, } for name, tc := range tests { @@ -89,8 +192,12 @@ func TestTemporalWorkerDeployment_ValidateCreate(t *testing.T) { require.NoError(t, err) } - // Warnings should always be nil for this implementation - assert.Nil(t, warnings) + if tc.warnMsg != "" { + require.NotEmpty(t, warnings) + assert.Contains(t, warnings[0], tc.warnMsg) + } else { + assert.Empty(t, warnings) + } } // Verify that create and update enforce the same rules @@ -168,6 +275,63 @@ func TestTemporalWorkerDeployment_Default(t *testing.T) { assert.Equal(t, 24*time.Hour, obj.Spec.SunsetStrategy.DeleteDelay.Duration) }, }, + "rollback strategy initialized when nil": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("default-rollback-nil", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = nil + return obj + }), + expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { + require.NotNil(t, obj.Spec.RollbackStrategy, "expected RollbackStrategy to be initialized by webhook") + assert.Equal(t, temporaliov1alpha1.RollbackAllAtOnce, obj.Spec.RollbackStrategy.Strategy, "expected RollbackStrategy.Strategy to default to AllAtOnce") + require.NotNil(t, obj.Spec.RollbackStrategy.MaxVersionAge) + assert.Equal(t, defaults.RollbackMaxVersionAge, obj.Spec.RollbackStrategy.MaxVersionAge.Duration) + }, + }, + "rollback strategy defaults empty strategy field to AllAtOnce": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("default-rollback-empty", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: "", + } + return obj + }), + expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { + require.NotNil(t, obj.Spec.RollbackStrategy) + assert.Equal(t, temporaliov1alpha1.RollbackAllAtOnce, obj.Spec.RollbackStrategy.Strategy, "expected RollbackStrategy.Strategy to default to AllAtOnce") + require.NotNil(t, obj.Spec.RollbackStrategy.MaxVersionAge) + assert.Equal(t, defaults.RollbackMaxVersionAge, obj.Spec.RollbackStrategy.MaxVersionAge.Duration) + }, + }, + "rollback strategy preserves explicit strategy": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("explicit-rollback-progressive", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {50, metav1.Duration{Duration: 30 * time.Second}}, + }, + } + return obj + }), + expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { + require.NotNil(t, obj.Spec.RollbackStrategy) + assert.Equal(t, temporaliov1alpha1.RollbackProgressive, obj.Spec.RollbackStrategy.Strategy, "expected RollbackStrategy.Strategy to remain Progressive") + require.NotNil(t, obj.Spec.RollbackStrategy.MaxVersionAge) + assert.Equal(t, defaults.RollbackMaxVersionAge, obj.Spec.RollbackStrategy.MaxVersionAge.Duration) + }, + }, + "rollback strategy preserves explicit MaxVersionAge": { + obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("explicit-rollback-max-version-age", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { + obj.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: 30 * time.Minute}, + } + return obj + }), + expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { + require.NotNil(t, obj.Spec.RollbackStrategy) + require.NotNil(t, obj.Spec.RollbackStrategy.MaxVersionAge) + assert.Equal(t, 30*time.Minute, obj.Spec.RollbackStrategy.MaxVersionAge.Duration, "expected explicit MaxVersionAge to be preserved") + }, + }, } for name, tc := range tests { diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 17805a42..b1a00abd 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -83,6 +83,22 @@ type TemporalWorkerDeploymentSpec struct { // How to rollout new workflow executions to the target version. RolloutStrategy RolloutStrategy `json:"rollout"` + // How to rollback to a previous version. If not specified, defaults to AllAtOnce strategy. + // + // A rollback is triggered automatically when the target version's pod spec is updated and + // the resulting build ID has previously been set as the default (current) version of the + // worker deployment. The controller detects this by checking whether Temporal recorded a + // non-nil LastCurrentTime for that build ID. + // + // The rollback strategy controls routing of NEW workflow executions only. Workflows already + // running are pinned to the version they started on and continue executing there; they are + // not affected by the rollback. Only new workflow executions will be routed to the rollback + // target version. + // + // Rollback is suppressed when the rollout strategy is Manual. + // +optional + RollbackStrategy *RollbackStrategy `json:"rollback,omitempty"` + // How to manage sunsetting drained versions. SunsetStrategy SunsetStrategy `json:"sunset"` @@ -332,17 +348,17 @@ type DeprecatedWorkerDeploymentVersion struct { EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"` } -// DefaultVersionUpdateStrategy describes how to cut over new workflow executions +// VersionRolloutStrategy describes how to cut over new workflow executions // to the target worker deployment version. // +kubebuilder:validation:Enum=Manual;AllAtOnce;Progressive -type DefaultVersionUpdateStrategy string +type VersionRolloutStrategy string const ( // UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version. - UpdateManual DefaultVersionUpdateStrategy = "Manual" + UpdateManual VersionRolloutStrategy = "Manual" // UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy. - UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce" + UpdateAllAtOnce VersionRolloutStrategy = "AllAtOnce" // UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time. // @@ -352,7 +368,19 @@ const ( // Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If // there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task // queues in this worker deployment can be handled by an active poller. - UpdateProgressive DefaultVersionUpdateStrategy = "Progressive" + UpdateProgressive VersionRolloutStrategy = "Progressive" +) + +// VersionRollbackStrategy describes how to cut over during rollback to a previous version. +// +kubebuilder:validation:Enum=AllAtOnce;Progressive +type VersionRollbackStrategy string + +const ( + // RollbackAllAtOnce immediately switches 100% of traffic back to the previous version. + RollbackAllAtOnce VersionRollbackStrategy = "AllAtOnce" + + // RollbackProgressive gradually ramps traffic back to the previous version. + RollbackProgressive VersionRollbackStrategy = "Progressive" ) type GateWorkflowConfig struct { @@ -385,7 +413,7 @@ type RolloutStrategy struct { // - "Manual" // - "AllAtOnce" // - "Progressive" - Strategy DefaultVersionUpdateStrategy `json:"strategy"` + Strategy VersionRolloutStrategy `json:"strategy"` // Gate specifies a workflow type that must run once to completion on the new worker deployment version before // any traffic is directed to the new version. @@ -396,6 +424,27 @@ type RolloutStrategy struct { Steps []RolloutStep `json:"steps,omitempty" protobuf:"bytes,3,rep,name=steps"` } +// RollbackStrategy defines strategy to apply when rolling back to a previous version. +// This is separate from RolloutStrategy because rollbacks have different requirements: +// - No gate workflow (already trusted version) +// - No manual mode (rollbacks should be automatic) +// - Default to AllAtOnce for fast recovery +type RollbackStrategy struct { + // Strategy for rollback. Valid values are "AllAtOnce" or "Progressive". + // Defaults to "AllAtOnce" for fast recovery. + Strategy VersionRollbackStrategy `json:"strategy"` + + // Steps to execute progressive rollbacks. Only required when strategy is "Progressive". + // +optional + Steps []RolloutStep `json:"steps,omitempty"` + + // MaxVersionAge limits which versions are eligible as rollback targets. + // A version is only considered a rollback target if it was last current within this duration. + // If nil, there is no age limit. + // +optional + MaxVersionAge *metav1.Duration `json:"maxVersionAge,omitempty"` +} + // SunsetStrategy defines strategy to apply when sunsetting k8s deployments of drained versions. type SunsetStrategy struct { // ScaledownDelay specifies how long to wait after a version is drained before scaling its Deployment to zero. diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 55954ac8..f31c54d8 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -156,6 +156,31 @@ func (in *ManualRolloutStrategy) DeepCopy() *ManualRolloutStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RollbackStrategy) DeepCopyInto(out *RollbackStrategy) { + *out = *in + if in.Steps != nil { + in, out := &in.Steps, &out.Steps + *out = make([]RolloutStep, len(*in)) + copy(*out, *in) + } + if in.MaxVersionAge != nil { + in, out := &in.MaxVersionAge, &out.MaxVersionAge + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RollbackStrategy. +func (in *RollbackStrategy) DeepCopy() *RollbackStrategy { + if in == nil { + return nil + } + out := new(RollbackStrategy) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RolloutStep) DeepCopyInto(out *RolloutStep) { *out = *in @@ -489,6 +514,11 @@ func (in *TemporalWorkerDeploymentSpec) DeepCopyInto(out *TemporalWorkerDeployme **out = **in } in.RolloutStrategy.DeepCopyInto(&out.RolloutStrategy) + if in.RollbackStrategy != nil { + in, out := &in.RollbackStrategy, &out.RollbackStrategy + *out = new(RollbackStrategy) + (*in).DeepCopyInto(*out) + } in.SunsetStrategy.DeepCopyInto(&out.SunsetStrategy) out.WorkerOptions = in.WorkerOptions } diff --git a/docs/configuration.md b/docs/configuration.md index 72c43e9a..9c6cc58e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -5,10 +5,11 @@ This document provides comprehensive configuration options for the Temporal Work ## Table of Contents 1. [Rollout Strategies](#rollout-strategies) -2. [Sunset Configuration](#sunset-configuration) -3. [Worker Options](#worker-options) -4. [Gate Configuration](#gate-configuration) -5. [Advanced Configuration](#advanced-configuration) +2. [Rollback Strategy](#rollback-strategy) +3. [Sunset Configuration](#sunset-configuration) +4. [Worker Options](#worker-options) +5. [Gate Configuration](#gate-configuration) +6. [Advanced Configuration](#advanced-configuration) ## Rollout Strategies @@ -110,6 +111,60 @@ rollout: pauseDuration: 0s # Full rollout after canary validation ``` +## Rollback Strategy + +The controller automatically detects rollbacks: if the target version was previously the current version (within `maxVersionAge`), it applies the rollback strategy instead of the rollout strategy. This means setting your target version back to a prior build ID is enough to trigger a rollback — no special action required. + +> **Note:** Rollback is suppressed when the rollout strategy is `Manual`, since manual mode implies full user control. + +### Default Rollback Configuration + +```yaml +rollback: + strategy: AllAtOnce # Default: immediately switch 100% of traffic back + maxVersionAge: 1h # Only treat versions as rollback targets if they were + # current within the last hour. Set to 0s to disable rollback. +``` + +### Rollback Strategies + +**AllAtOnce (default):** Immediately routes 100% of traffic back to the previous version. Recommended for fast recovery. + +```yaml +rollback: + strategy: AllAtOnce + maxVersionAge: 1h +``` + +**Progressive:** Gradually ramps traffic back, using the same step-based mechanism as progressive rollouts. Use this when you want a controlled rollback, for example to observe metrics during the reversion. + +```yaml +rollback: + strategy: Progressive + maxVersionAge: 1h + steps: + - rampPercentage: 50 + pauseDuration: 5m + - rampPercentage: 75 + pauseDuration: 5m +``` + +> **Warning:** The controller will warn at admission time if the rollback strategy is slower than the rollout strategy, since a slow rollback defeats the purpose of fast recovery. If you configure a progressive rollback with a longer total duration than your rollout steps, verify this is intentional. + +### `maxVersionAge` + +Controls which versions are eligible as rollback targets. A version is only considered a rollback target if the last time it was the current version falls within this duration. Set to a short duration (e.g. `1h`) to limit rollbacks to recently-promoted versions. + +### Disabling Rollbacks + +Set `maxVersionAge` to `0s` to disable automatic rollback detection entirely. The controller will treat every target version as a fresh rollout, regardless of its history. + +```yaml +rollback: + strategy: AllAtOnce + maxVersionAge: 0s +``` + ## Sunset Configuration Controls how old versions are scaled down and cleaned up after they're no longer receiving new traffic: diff --git a/go.mod b/go.mod index b582432a..8d589d04 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/onsi/gomega v1.36.1 github.com/pborman/uuid v1.2.1 github.com/stretchr/testify v1.11.1 - go.temporal.io/api v1.60.2 + go.temporal.io/api v1.61.0 go.temporal.io/sdk v1.38.0 go.temporal.io/sdk/contrib/envconfig v0.1.0 go.temporal.io/server v1.30.1 @@ -21,6 +21,7 @@ require ( k8s.io/apimachinery v0.34.0 k8s.io/client-go v0.34.0 sigs.k8s.io/controller-runtime v0.21.0 + sigs.k8s.io/yaml v1.6.0 ) require ( @@ -191,5 +192,4 @@ require ( sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect - sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index 84cf9ca1..12326c8d 100644 --- a/go.sum +++ b/go.sum @@ -396,8 +396,8 @@ go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZY go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= go.opentelemetry.io/proto/otlp v1.7.1 h1:gTOMpGDb0WTBOP8JaO72iL3auEZhVmAQg4ipjOVAtj4= go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= -go.temporal.io/api v1.60.2 h1:xqUqdPeOu8/HNWVPu51P6tVoBJ5kRh8nBI62xXi+IWg= -go.temporal.io/api v1.60.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.61.0 h1:gGIvUu1pRE9yVKqlirYd5FGDT5N/hvcZ0tlB4mRvVM4= +go.temporal.io/api v1.61.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.temporal.io/sdk/contrib/envconfig v0.1.0 h1:s+G/Ujph+Xl2jzLiiIm2T1vuijDkUL4Kse49dgDVGBE= diff --git a/go.work.sum b/go.work.sum index 7456a02b..2ea612ad 100644 --- a/go.work.sum +++ b/go.work.sum @@ -820,6 +820,7 @@ github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -1252,6 +1253,7 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13 h1:fVcFKWvrslecOb/tg+Cc05dkeYx540o0FuFt3nUVDoE= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= @@ -1444,6 +1446,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= @@ -1481,6 +1484,7 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -1500,6 +1504,7 @@ golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1514,6 +1519,7 @@ golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbht golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/oauth2 v0.27.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= golang.org/x/oauth2 v0.28.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8= +golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -1527,6 +1533,7 @@ golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1559,7 +1566,9 @@ golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1659,6 +1668,7 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= @@ -1670,6 +1680,7 @@ golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= diff --git a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml index d4c5ce21..410619bd 100644 --- a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml @@ -58,6 +58,32 @@ spec: replicas: format: int32 type: integer + rollback: + properties: + maxVersionAge: + type: string + steps: + items: + properties: + pauseDuration: + type: string + rampPercentage: + maximum: 99 + minimum: 1 + type: integer + required: + - pauseDuration + - rampPercentage + type: object + type: array + strategy: + enum: + - AllAtOnce + - Progressive + type: string + required: + - strategy + type: object rollout: properties: gate: diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index ad5e54a7..02da8cf8 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -90,43 +90,18 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( } rolloutStrategy := w.Spec.RolloutStrategy + rollbackStrategy := w.Spec.RollbackStrategy - // Resolve gate input if gate is configured - var gateInput []byte - var isGateInputSecret bool - if rolloutStrategy.Gate != nil { - // Fetch ConfigMap or Secret data if needed - var configMapData map[string]string - var configMapBinaryData map[string][]byte - var secretData map[string][]byte - - if rolloutStrategy.Gate.InputFrom != nil { - if cmRef := rolloutStrategy.Gate.InputFrom.ConfigMapKeyRef; cmRef != nil { - cm := &corev1.ConfigMap{} - if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: cmRef.Name}, cm); err != nil { - return nil, fmt.Errorf("failed to get ConfigMap %s/%s: %w", w.Namespace, cmRef.Name, err) - } - configMapData = cm.Data - configMapBinaryData = cm.BinaryData - } - if secRef := rolloutStrategy.Gate.InputFrom.SecretKeyRef; secRef != nil { - sec := &corev1.Secret{} - if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: secRef.Name}, sec); err != nil { - return nil, fmt.Errorf("failed to get Secret %s/%s: %w", w.Namespace, secRef.Name, err) - } - secretData = sec.Data - } - } - - gateInput, isGateInputSecret, err = planner.ResolveGateInput(rolloutStrategy.Gate, w.Namespace, configMapData, configMapBinaryData, secretData) - if err != nil { - return nil, fmt.Errorf("unable to resolve gate input: %w", err) - } + // Resolve gate workflow if needed + gateInput, isGateInputSecret, err := r.resolveGateWorkflow(ctx, l, w, rolloutStrategy, temporalState) + if err != nil { + return nil, fmt.Errorf("unable to resolve gate input: %w", err) } // Generate the plan using the planner package plannerConfig := &planner.Config{ - RolloutStrategy: rolloutStrategy, + RolloutStrategy: rolloutStrategy, + RollbackStrategy: rollbackStrategy, } // Fetch all WorkerResourceTemplates that reference this TWD so that the planner @@ -195,6 +170,43 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( return plan, nil } +func (r *TemporalWorkerDeploymentReconciler) resolveGateWorkflow( + ctx context.Context, + l logr.Logger, + w *temporaliov1alpha1.TemporalWorkerDeployment, + rolloutStrategy temporaliov1alpha1.RolloutStrategy, + temporalState *temporal.TemporalWorkerState, +) (gateInput []byte, isSecret bool, err error) { + if rolloutStrategy.Gate == nil { + return nil, false, nil + } + + // Fetch ConfigMap or Secret data if needed + var configMapData map[string]string + var configMapBinaryData map[string][]byte + var secretData map[string][]byte + + if rolloutStrategy.Gate.InputFrom != nil { + if cmRef := rolloutStrategy.Gate.InputFrom.ConfigMapKeyRef; cmRef != nil { + cm := &corev1.ConfigMap{} + if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: cmRef.Name}, cm); err != nil { + return nil, false, fmt.Errorf("failed to get ConfigMap %s/%s: %w", w.Namespace, cmRef.Name, err) + } + configMapData = cm.Data + configMapBinaryData = cm.BinaryData + } + if secRef := rolloutStrategy.Gate.InputFrom.SecretKeyRef; secRef != nil { + sec := &corev1.Secret{} + if err := r.Client.Get(ctx, types.NamespacedName{Namespace: w.Namespace, Name: secRef.Name}, sec); err != nil { + return nil, false, fmt.Errorf("failed to get Secret %s/%s: %w", w.Namespace, secRef.Name, err) + } + secretData = sec.Data + } + } + + return planner.ResolveGateInput(rolloutStrategy.Gate, w.Namespace, configMapData, configMapBinaryData, secretData) +} + // Create a new deployment with owner reference func (r *TemporalWorkerDeploymentReconciler) newDeployment( w *temporaliov1alpha1.TemporalWorkerDeployment, diff --git a/internal/defaults/defaults.go b/internal/defaults/defaults.go index 0eec90dd..2c5e903c 100644 --- a/internal/defaults/defaults.go +++ b/internal/defaults/defaults.go @@ -9,6 +9,7 @@ import "time" const ( ScaledownDelay = 1 * time.Hour DeleteDelay = 24 * time.Hour + RollbackMaxVersionAge = 1 * time.Hour ServerMaxVersions = 100 MaxVersionsIneligibleForDeletion = int32(ServerMaxVersions * 0.75) ControllerIdentity = "temporal-worker-controller" diff --git a/internal/k8s/deployments_test.go b/internal/k8s/deployments_test.go index 7b5354bc..9bfc57e7 100644 --- a/internal/k8s/deployments_test.go +++ b/internal/k8s/deployments_test.go @@ -233,8 +233,8 @@ func TestGenerateBuildID(t *testing.T) { pod1 := testhelpers.MakePodSpec([]corev1.Container{{Image: img}}, map[string]string{"pod": "1"}, "") pod2 := testhelpers.MakePodSpec([]corev1.Container{{Image: img}}, map[string]string{"pod": "2"}, "") - twd1 := testhelpers.MakeTWD("", "", 1, pod1, nil, nil, nil) - twd2 := testhelpers.MakeTWD("", "", 1, pod2, nil, nil, nil) + twd1 := testhelpers.MakeDefaultTWD("", "", 1, pod1) + twd2 := testhelpers.MakeDefaultTWD("", "", 1, pod2) return twd1, twd2 }, expectedPrefix: "my.test_image", @@ -246,8 +246,8 @@ func TestGenerateBuildID(t *testing.T) { generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { img := "my.test_image" pod := testhelpers.MakePodSpec([]corev1.Container{{Image: img}}, nil, "") - twd1 := testhelpers.MakeTWD("", "", 1, pod, nil, nil, nil) - twd2 := testhelpers.MakeTWD("", "", 2, pod, nil, nil, nil) + twd1 := testhelpers.MakeDefaultTWD("", "", 1, pod) + twd2 := testhelpers.MakeDefaultTWD("", "", 2, pod) return twd1, twd2 }, expectedPrefix: "my.test_image", @@ -257,7 +257,7 @@ func TestGenerateBuildID(t *testing.T) { { name: "no containers", generateInputs: func() (*temporaliov1alpha1.TemporalWorkerDeployment, *temporaliov1alpha1.TemporalWorkerDeployment) { - twd := testhelpers.MakeTWD("", "", 1, testhelpers.MakePodSpec(nil, nil, ""), nil, nil, nil) + twd := testhelpers.MakeDefaultTWD("", "", 1, testhelpers.MakePodSpec(nil, nil, "")) return twd, nil // only check 1 result, no need to compare }, expectedPrefix: "", diff --git a/internal/planner/planner.go b/internal/planner/planner.go index dc283912..4219c150 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -118,6 +118,8 @@ type WorkflowConfig struct { type Config struct { // RolloutStrategy to use RolloutStrategy temporaliov1alpha1.RolloutStrategy + // RollbackStrategy to use + RollbackStrategy *temporaliov1alpha1.RollbackStrategy } // GeneratePlan creates a plan for updating the worker deployment @@ -759,7 +761,51 @@ func getTestWorkflows( return testWorkflows } -// getVersionConfigDiff determines the version configuration based on the rollout strategy +func isRollbackScenario( + l logr.Logger, + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, + temporalState *temporal.TemporalWorkerState, + config *Config, +) bool { + if config.RollbackStrategy == nil { + return false + } + + if config.RolloutStrategy.Strategy == temporaliov1alpha1.UpdateManual { + return false + } + + if temporalState == nil { + return false + } + + targetVersionInfo, exists := temporalState.Versions[status.TargetVersion.BuildID] + if !exists { + return false + } + + if targetVersionInfo.LastCurrentTime == nil { + return false + } + + if config.RollbackStrategy.MaxVersionAge != nil && time.Since(*targetVersionInfo.LastCurrentTime) > config.RollbackStrategy.MaxVersionAge.Duration { + l.Info("Skipping rollback: the version's last current time exceeds MaxVersionAge", + "targetBuildID", status.TargetVersion.BuildID, + "lastCurrentTime", targetVersionInfo.LastCurrentTime, + "maxVersionAge", config.RollbackStrategy.MaxVersionAge.Duration) + return false + } + + l.Info("Detected rollback scenario using LastCurrentTime. "+ + "Warning: Auto-upgrade workflows that upgraded from a previous version to the current version may fail during this rollback, "+ + "as they may not handle downgrades properly. Monitor workflow executions for failures.", + "targetBuildID", status.TargetVersion.BuildID, + "lastCurrentTime", targetVersionInfo.LastCurrentTime) + + return true +} + +// getVersionConfigDiff determines the version configuration based on the rollout/rollback strategies func getVersionConfigDiff( l logr.Logger, status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, @@ -767,9 +813,14 @@ func getVersionConfigDiff( config *Config, workerDeploymentName string, ) *VersionConfig { - strategy := config.RolloutStrategy - conflictToken := status.VersionConflictToken + var strategy temporaliov1alpha1.RolloutStrategy + if isRollbackScenario(l, status, temporalState, config) { + strategy = convertRollbackToRolloutStrategy(*config.RollbackStrategy) + } else { + strategy = config.RolloutStrategy + } + // Manual strategy check (only relevant for rollout) if strategy.Strategy == temporaliov1alpha1.UpdateManual { return nil } @@ -800,7 +851,7 @@ func getVersionConfigDiff( managerIdentity = temporalState.ManagerIdentity } vcfg := &VersionConfig{ - ConflictToken: conflictToken, + ConflictToken: status.VersionConflictToken, BuildID: status.TargetVersion.BuildID, ManagerIdentity: managerIdentity, } @@ -840,6 +891,25 @@ func getVersionConfigDiff( return nil } +// Convert to reuse rollout logic with different settings +func convertRollbackToRolloutStrategy(rb temporaliov1alpha1.RollbackStrategy) temporaliov1alpha1.RolloutStrategy { + var strategy temporaliov1alpha1.VersionRolloutStrategy + switch rb.Strategy { + case temporaliov1alpha1.RollbackAllAtOnce: + strategy = temporaliov1alpha1.UpdateAllAtOnce + case temporaliov1alpha1.RollbackProgressive: + strategy = temporaliov1alpha1.UpdateProgressive + default: + strategy = temporaliov1alpha1.UpdateAllAtOnce + } + + return temporaliov1alpha1.RolloutStrategy{ + Strategy: strategy, + Steps: rb.Steps, + Gate: nil, // Rollbacks don't have gates + } +} + // handleProgressiveRollout handles the progressive rollout strategy logic func handleProgressiveRollout( steps []temporaliov1alpha1.RolloutStep, diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index b9dab803..126f4671 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -3581,3 +3581,448 @@ func TestGetWRTOwnerRefPatches(t *testing.T) { require.Len(t, patches, 1) }) } + +func TestIsRollbackScenario(t *testing.T) { + recentLastCurrentTime := time.Now().Add(-5 * time.Minute) + oldLastCurrentTime := time.Now().Add(-2 * time.Hour) + + defaultConfig := &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{Strategy: temporaliov1alpha1.UpdateAllAtOnce}, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + } + + testCases := []struct { + name string + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + temporalState *temporal.TemporalWorkerState + config *Config + expectedResult bool + }{ + { + name: "rollback detected via LastCurrentTime within MaxVersionAge", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &recentLastCurrentTime, + }, + }, + }, + config: defaultConfig, + expectedResult: true, + }, + { + name: "rollout when LastCurrentTime is nil", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v2", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v2": { + BuildID: "build-v2", + LastCurrentTime: nil, + }, + }, + }, + config: defaultConfig, + expectedResult: false, + }, + { + name: "rollout when target version not in temporal state", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v3", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &recentLastCurrentTime, + }, + }, + }, + config: defaultConfig, + expectedResult: false, + }, + { + name: "rollout when temporalState is nil", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + }, + }, + }, + temporalState: nil, + config: defaultConfig, + expectedResult: false, + }, + { + name: "rollout when version age exceeds MaxVersionAge", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &oldLastCurrentTime, + }, + }, + }, + config: defaultConfig, + expectedResult: false, + }, + { + name: "rollback disabled when MaxVersionAge is zero", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &recentLastCurrentTime, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: 0}, + }, + }, + expectedResult: false, + }, + { + name: "rollback skipped when rollout strategy is Manual", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + }, + }, + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &recentLastCurrentTime, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateManual, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + }, + expectedResult: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + logger := logr.Discard() + result := isRollbackScenario(logger, tc.status, tc.temporalState, tc.config) + assert.Equal(t, tc.expectedResult, result) + }) + } +} + +func TestConvertRollbackToRolloutStrategy(t *testing.T) { + testCases := []struct { + name string + rollbackStrategy temporaliov1alpha1.RollbackStrategy + expectedStrategy temporaliov1alpha1.VersionRolloutStrategy + expectedSteps []temporaliov1alpha1.RolloutStep + expectedGate *temporaliov1alpha1.GateWorkflowConfig + }{ + { + name: "AllAtOnce rollback converts to AllAtOnce rollout", + rollbackStrategy: temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + }, + expectedStrategy: temporaliov1alpha1.UpdateAllAtOnce, + expectedSteps: nil, + expectedGate: nil, + }, + { + name: "Progressive rollback preserves all steps in order", + rollbackStrategy: temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 25, PauseDuration: metav1.Duration{Duration: 30 * time.Second}}, + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: time.Minute}}, + {RampPercentage: 75, PauseDuration: metav1.Duration{Duration: 2 * time.Minute}}, + }, + }, + expectedStrategy: temporaliov1alpha1.UpdateProgressive, + expectedSteps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 25, PauseDuration: metav1.Duration{Duration: 30 * time.Second}}, + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: time.Minute}}, + {RampPercentage: 75, PauseDuration: metav1.Duration{Duration: 2 * time.Minute}}, + }, + expectedGate: nil, + }, + { + name: "empty strategy defaults to AllAtOnce", + rollbackStrategy: temporaliov1alpha1.RollbackStrategy{ + Strategy: "", + }, + expectedStrategy: temporaliov1alpha1.UpdateAllAtOnce, + expectedSteps: nil, + expectedGate: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := convertRollbackToRolloutStrategy(tc.rollbackStrategy) + + assert.Equal(t, tc.expectedStrategy, result.Strategy) + assert.Equal(t, tc.expectedSteps, result.Steps) + assert.Equal(t, tc.expectedGate, result.Gate) + }) + } +} + +func TestGetVersionConfigDiff_RollbackScenario(t *testing.T) { + logger := logr.Discard() + workerDeploymentName := "test-deployment" + now := time.Now() + zeroRamp := float32(0) + + testCases := []struct { + name string + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + temporalState *temporal.TemporalWorkerState + config *Config + expectSetCurrent bool + description string + }{ + { + name: "rollback with default AllAtOnce strategy", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + Status: temporaliov1alpha1.VersionStatusInactive, + HealthySince: &metav1.Time{Time: now}, + }, + }, + VersionConflictToken: []byte("token123"), + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &now, + Status: temporaliov1alpha1.VersionStatusInactive, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 25, PauseDuration: metav1.Duration{Duration: 5 * time.Minute}}, + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: 5 * time.Minute}}, + }, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + }, + expectSetCurrent: true, + description: "Rollback with AllAtOnce should immediately set version as current", + }, + { + name: "rollback with Progressive strategy", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v2", + Status: temporaliov1alpha1.VersionStatusCurrent, + HealthySince: &metav1.Time{Time: now}, + }, + }, + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + Status: temporaliov1alpha1.VersionStatusInactive, + HealthySince: &metav1.Time{Time: now}, + }, + RampPercentage: &zeroRamp, + }, + VersionConflictToken: []byte("token123"), + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &now, + Status: temporaliov1alpha1.VersionStatusInactive, + }, + "build-v2": { + BuildID: "build-v2", + LastCurrentTime: nil, + Status: temporaliov1alpha1.VersionStatusCurrent, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: time.Minute}}, + }, + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + }, + expectSetCurrent: false, + description: "Rollback with Progressive should not immediately set as current", + }, + { + name: "normal rollout when LastCurrentTime is nil", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + Status: temporaliov1alpha1.VersionStatusCurrent, + HealthySince: &metav1.Time{Time: now}, + }, + }, + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v2", + Status: temporaliov1alpha1.VersionStatusInactive, + HealthySince: &metav1.Time{Time: now}, + }, + }, + VersionConflictToken: []byte("token456"), + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &now, + Status: temporaliov1alpha1.VersionStatusCurrent, + }, + "build-v2": { + BuildID: "build-v2", + LastCurrentTime: nil, + Status: temporaliov1alpha1.VersionStatusInactive, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateProgressive, + Steps: []temporaliov1alpha1.RolloutStep{ + {RampPercentage: 50, PauseDuration: metav1.Duration{Duration: time.Minute}}, + }, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, // would set current immediately if wrongly used + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + }, + expectSetCurrent: false, + description: "New version (nil LastCurrentTime) should use RolloutStrategy, not rollback AllAtOnce", + }, + { + name: "rollback when target version is drained", + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v2", + Status: temporaliov1alpha1.VersionStatusCurrent, + HealthySince: &metav1.Time{Time: now}, + }, + }, + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-v1", + Status: temporaliov1alpha1.VersionStatusDrained, + HealthySince: &metav1.Time{Time: now}, + }, + }, + VersionConflictToken: []byte("token789"), + }, + temporalState: &temporal.TemporalWorkerState{ + Versions: map[string]*temporal.VersionInfo{ + "build-v1": { + BuildID: "build-v1", + LastCurrentTime: &now, + Status: temporaliov1alpha1.VersionStatusDrained, + }, + "build-v2": { + BuildID: "build-v2", + LastCurrentTime: nil, + Status: temporaliov1alpha1.VersionStatusCurrent, + }, + }, + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + RollbackStrategy: &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + MaxVersionAge: &metav1.Duration{Duration: defaults.RollbackMaxVersionAge}, + }, + }, + expectSetCurrent: true, + description: "Rollback to a previously-drained version should be detected via LastCurrentTime and immediately set as current", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := getVersionConfigDiff(logger, tc.status, tc.temporalState, tc.config, workerDeploymentName) + + if result == nil { + t.Fatal("expected non-nil VersionConfig") + } + + assert.Equal(t, tc.expectSetCurrent, result.SetCurrent, tc.description) + assert.Equal(t, tc.status.VersionConflictToken, result.ConflictToken) + }) + } +} diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 5834a026..ff3824cf 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -38,6 +38,10 @@ type VersionInfo struct { // - Strategy is Progressive, and // - Presence of unversioned pollers in all task queues of target version cannot be confirmed. AllTaskQueuesHaveUnversionedPoller bool + // LastCurrentTime is the timestamp when this version last became current. + // Used to determine if this is a rollback scenario (version was previously current). + // Nil if the version was never current or if the server doesn't support this field. + LastCurrentTime *time.Time } // TemporalWorkerState represents the state of a worker deployment in Temporal @@ -63,7 +67,7 @@ func GetWorkerDeploymentState( namespace string, k8sDeployments map[string]*appsv1.Deployment, targetBuildID string, - strategy temporaliov1alpha1.DefaultVersionUpdateStrategy, + strategy temporaliov1alpha1.VersionRolloutStrategy, controllerIdentity string, ) (*TemporalWorkerState, error) { state := &TemporalWorkerState{ @@ -177,6 +181,11 @@ func GetWorkerDeploymentState( } + if lct := version.GetLastCurrentTime(); lct != nil { + t := lct.AsTime() + versionInfo.LastCurrentTime = &t + } + state.Versions[version.DeploymentVersion.BuildId] = versionInfo } diff --git a/internal/testhelpers/make.go b/internal/testhelpers/make.go index d7c41169..cfb3cbd9 100644 --- a/internal/testhelpers/make.go +++ b/internal/testhelpers/make.go @@ -6,6 +6,7 @@ import ( "github.com/pborman/uuid" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/defaults" "github.com/temporalio/temporal-worker-controller/internal/k8s" "go.temporal.io/server/common/worker_versioning" corev1 "k8s.io/api/core/v1" @@ -23,21 +24,10 @@ func MakeTWD( replicas int32, podSpec corev1.PodTemplateSpec, rolloutStrategy *temporaliov1alpha1.RolloutStrategy, + rollbackStrategy *temporaliov1alpha1.RollbackStrategy, sunsetStrategy *temporaliov1alpha1.SunsetStrategy, workerOpts *temporaliov1alpha1.WorkerOptions, ) *temporaliov1alpha1.TemporalWorkerDeployment { - r := temporaliov1alpha1.RolloutStrategy{} - s := temporaliov1alpha1.SunsetStrategy{} - w := temporaliov1alpha1.WorkerOptions{} - if rolloutStrategy != nil { - r = *rolloutStrategy - } - if sunsetStrategy != nil { - s = *sunsetStrategy - } - if workerOpts != nil { - w = *workerOpts - } twd := &temporaliov1alpha1.TemporalWorkerDeployment{ TypeMeta: metav1.TypeMeta{ @@ -51,17 +41,58 @@ func MakeTWD( Labels: map[string]string{"app": "test-worker"}, }, Spec: temporaliov1alpha1.TemporalWorkerDeploymentSpec{ - Replicas: &replicas, - Template: podSpec, - RolloutStrategy: r, - SunsetStrategy: s, - WorkerOptions: w, + Replicas: &replicas, + Template: podSpec, + RolloutStrategy: *rolloutStrategy, + RollbackStrategy: rollbackStrategy, + SunsetStrategy: *sunsetStrategy, + WorkerOptions: *workerOpts, }, } twd.Name = twd.ObjectMeta.Name return twd } +func MakeDefaultTWD( + name string, + namespace string, + replicas int32, + podSpec corev1.PodTemplateSpec, +) *temporaliov1alpha1.TemporalWorkerDeployment { + // Rollout doesn't have a default because it's required in the spec + // Arbitrarily choose AllAtOnce here to avoid empty values + rolloutStrategy := &temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + } + + rollbackStrategy := &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + } + + sunsetStrategy := &temporaliov1alpha1.SunsetStrategy{ + ScaledownDelay: &metav1.Duration{Duration: defaults.ScaledownDelay}, + DeleteDelay: &metav1.Duration{Duration: defaults.DeleteDelay}, + } + + workerOpts := &temporaliov1alpha1.WorkerOptions{ + TemporalConnectionRef: temporaliov1alpha1.TemporalConnectionReference{ + Name: "default-connection", + }, + TemporalNamespace: "default", + } + + return MakeTWD( + name, + namespace, + replicas, + podSpec, + rolloutStrategy, + rollbackStrategy, + sunsetStrategy, + workerOpts, + ) +} + // MakePodSpec creates a pod spec with the given containers, labels, and task queue func MakePodSpec(containers []corev1.Container, labels map[string]string, taskQueue string) corev1.PodTemplateSpec { for i := range containers { @@ -106,7 +137,7 @@ func SetTaskQueue(podSpec corev1.PodTemplateSpec, taskQueue string) corev1.PodTe } func MakeTWDWithImage(name, namespace, imageName string) *temporaliov1alpha1.TemporalWorkerDeployment { - return MakeTWD(name, namespace, 1, MakePodSpec([]corev1.Container{{Image: imageName}}, nil, ""), nil, nil, nil) + return MakeDefaultTWD(name, namespace, 1, MakePodSpec([]corev1.Container{{Image: imageName}}, nil, "")) } // MakeBuildID computes a build id based on the image and @@ -132,7 +163,7 @@ func MakeBuildID(twdName, imageName, unsafeCustomBuildID string, podSpec *corev1 } func MakeTWDWithName(name, namespace string) *temporaliov1alpha1.TemporalWorkerDeployment { - twd := MakeTWD(name, namespace, 1, MakePodSpec(nil, nil, ""), nil, nil, nil) + twd := MakeDefaultTWD(name, namespace, 1, MakePodSpec(nil, nil, "")) twd.ObjectMeta.Name = name twd.Name = name return twd diff --git a/internal/testhelpers/test_builder.go b/internal/testhelpers/test_builder.go index 89966c40..635ac9e9 100644 --- a/internal/testhelpers/test_builder.go +++ b/internal/testhelpers/test_builder.go @@ -58,6 +58,23 @@ func (b *TemporalWorkerDeploymentBuilder) WithProgressiveStrategy(steps ...tempo return b } +// WithRollbackAllAtOnceStrategy sets the rollback strategy to all-at-once +func (b *TemporalWorkerDeploymentBuilder) WithRollbackAllAtOnceStrategy() *TemporalWorkerDeploymentBuilder { + b.twd.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackAllAtOnce, + } + return b +} + +// WithRollbackProgressiveStrategy sets the rollback strategy to progressive with given steps +func (b *TemporalWorkerDeploymentBuilder) WithRollbackProgressiveStrategy(steps ...temporaliov1alpha1.RolloutStep) *TemporalWorkerDeploymentBuilder { + b.twd.Spec.RollbackStrategy = &temporaliov1alpha1.RollbackStrategy{ + Strategy: temporaliov1alpha1.RollbackProgressive, + Steps: steps, + } + return b +} + // WithGate sets the rollout strategy have a gate workflow func (b *TemporalWorkerDeploymentBuilder) WithGate(expectSuccess bool) *TemporalWorkerDeploymentBuilder { if expectSuccess { @@ -233,6 +250,11 @@ type TestCase struct { // If starting from a particular state, specify that in input.Status. twd *temporaliov1alpha1.TemporalWorkerDeployment + // previouslyCurrentImages is a list of image names whose corresponding versions should be + // registered in Temporal and briefly promoted to current (giving them a LastCurrentTime) + // before the test runs. The actual current version is restored afterward. + previouslyCurrentImages []string + // existingDeploymentReplicas specifies the number of replicas for each deprecated build. // TemporalWorkerDeploymentStatus only tracks the names of the Deployments for deprecated // versions, so for test scenarios that start with existing deprecated version Deployments, @@ -315,6 +337,10 @@ func (tc *TestCase) GetValidatorFunc() func(t *testing.T, ctx context.Context, t return tc.validatorFunc } +func (tc *TestCase) GetPreviouslyCurrentImages() []string { + return tc.previouslyCurrentImages +} + // TestCaseBuilder provides a fluent interface for building test cases type TestCaseBuilder struct { name string @@ -327,6 +353,8 @@ type TestCaseBuilder struct { expectedDeploymentInfos []DeploymentInfo waitTime *time.Duration + previouslyCurrentImages []string + setupFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) twdMutatorFunc func(*temporaliov1alpha1.TemporalWorkerDeployment) postTWDCreateFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) @@ -357,6 +385,15 @@ func NewTestCaseWithValues(name, k8sNamespace, temporalNamespace string) *TestCa } } +// WithPreviouslyCurrentVersions specifies images whose versions should be registered in Temporal and +// briefly promoted to current (giving them a LastCurrentTime) before the test runs. The actual +// current version declared in WithStatus is restored afterward. Use this for rollback test scenarios +// where the rollback target was previously the current version. +func (tcb *TestCaseBuilder) WithPreviouslyCurrentVersions(imageNames ...string) *TestCaseBuilder { + tcb.previouslyCurrentImages = imageNames + return tcb +} + // WithSetupFunction defines a function that the test case will call while setting up the state, after creating the initial Status. func (tcb *TestCaseBuilder) WithSetupFunction(f func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv)) *TestCaseBuilder { tcb.setupFunc = f @@ -467,11 +504,12 @@ func (tcb *TestCaseBuilder) WithExpectedStatus(statusBuilder *StatusBuilder) *Te // Build returns the constructed test case func (tcb *TestCaseBuilder) Build() TestCase { ret := TestCase{ - setupFunc: tcb.setupFunc, - twdMutatorFunc: tcb.twdMutatorFunc, - postTWDCreateFunc: tcb.postTWDCreateFunc, - validatorFunc: tcb.validatorFunc, - waitTime: tcb.waitTime, + setupFunc: tcb.setupFunc, + twdMutatorFunc: tcb.twdMutatorFunc, + postTWDCreateFunc: tcb.postTWDCreateFunc, + validatorFunc: tcb.validatorFunc, + waitTime: tcb.waitTime, + previouslyCurrentImages: tcb.previouslyCurrentImages, twd: tcb.twdBuilder. WithName(tcb.name). WithNamespace(tcb.k8sNamespace). diff --git a/internal/testhelpers/workers.go b/internal/testhelpers/workers.go index d16d4ce0..f90903bf 100644 --- a/internal/testhelpers/workers.go +++ b/internal/testhelpers/workers.go @@ -55,6 +55,22 @@ func newVersionedWorker(ctx context.Context, podTemplateSpec corev1.PodTemplateS return NewWorker(ctx, temporalDeploymentName, workerBuildID, temporalTaskQueue, temporalHostPort, temporalNamespace, true) } +// StartVersionedWorker creates a versioned worker and registers a dummy workflow on it. +// This is used to register a build ID with a Temporal worker deployment to set LastCurrentTime. +// Returns a stop function that must be called when done. +func StartVersionedWorker(ctx context.Context, temporalDeploymentName, workerBuildID, temporalTaskQueue, temporalHostPort, temporalNamespace string) (stopFunc func(), err error) { + w, stop, err := NewWorker(ctx, temporalDeploymentName, workerBuildID, temporalTaskQueue, temporalHostPort, temporalNamespace, true) + if err != nil { + return nil, err + } + w.RegisterWorkflow(func(workflow.Context) error { return nil }) + if err := w.Start(); err != nil { + stop() + return nil, err + } + return stop, nil +} + func NewWorker( ctx context.Context, temporalDeploymentName, workerBuildID, temporalTaskQueue, temporalHostPort, temporalNamespace string, diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go index 371db580..a3fbd31b 100644 --- a/internal/tests/internal/deployment_controller.go +++ b/internal/tests/internal/deployment_controller.go @@ -32,13 +32,16 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { } } -func startAndStopWorker(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) { +// startWorker starts a single worker for the given deployment and returns a stop function. +// The caller is responsible for calling the stop function when done. +func startWorker(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) func() { var deployment appsv1.Deployment if err := k8sClient.Get(ctx, types.NamespacedName{ Name: deploymentName, Namespace: namespace, }, &deployment); err != nil { t.Fatalf("failed to get deployment: %v", err) + return func() {} } startedCh := make(chan struct{}) @@ -56,11 +59,10 @@ func startAndStopWorker(t *testing.T, ctx context.Context, k8sClient client.Clie // wait for worker to start <-startedCh - time.Sleep(1 * time.Second) - - // kill worker - if stop != nil { - stop() + return func() { + if stop != nil { + stop() + } } } @@ -157,6 +159,7 @@ func makePreliminaryStatusTrue( t *testing.T, env testhelpers.TestEnv, twd *temporaliov1alpha1.TemporalWorkerDeployment, + previouslyCurrentImages []string, ) { t.Logf("Creating starting test env based on input.Status") @@ -169,6 +172,25 @@ func makePreliminaryStatusTrue( loopDefers = append(loopDefers, func() { handleStopFuncs(workerStopFuncs) }) } + // Register versions that were previously current so they have LastCurrentTime set in Temporal + workerDeploymentName := k8s.ComputeWorkerDeploymentName(twd) + for _, image := range previouslyCurrentImages { + buildID := testhelpers.MakeBuildID(twd.Name, image, "", nil) + t.Logf("Setting LastCurrentTime for previously-current version %q (buildID %q)", image, buildID) + stopFunc, err := testhelpers.StartVersionedWorker(ctx, workerDeploymentName, buildID, twd.Name, + env.Ts.GetFrontendHostPort(), env.Ts.GetDefaultNamespace()) + if err != nil { + t.Errorf("failed to start worker for previously-current build %q: %v", buildID, err) + continue + } + waitForVersionRegistrationInDeployment(t, ctx, env.Ts, &worker.WorkerDeploymentVersion{ + DeploymentName: workerDeploymentName, + BuildID: buildID, + }) + setCurrentVersion(t, ctx, env.Ts, workerDeploymentName, buildID) + loopDefers = append(loopDefers, stopFunc) + } + if tv := twd.Status.TargetVersion; tv.BuildID != "" { t.Logf("Setting up target version %v with status %v", tv.BuildID, tv.Status) var rampPercentage *float32 @@ -225,9 +247,15 @@ func createStatus( setRampingVersion(t, ctx, env.Ts, v.DeploymentName, "", 0) case temporaliov1alpha1.VersionStatusDrained: if env.ExistingDeploymentReplicas[v.BuildID] == 0 { - startAndStopWorker(t, ctx, env.K8sClient, expectedDeploymentName, prevTWD.Namespace) + // Keep the temporary worker alive until setCurrentVersion completes. + // Stopping it before the call risks the Worker Deployment entry being + // cleaned up (under short PollerHistoryTTL) before registration is confirmed. + stopTemporaryWorker := startWorker(t, ctx, env.K8sClient, expectedDeploymentName, prevTWD.Namespace) + setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, v.BuildID) + stopTemporaryWorker() + } else { + setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, v.BuildID) } - setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, v.BuildID) setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, "") } } diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 53fe2dd6..809e3e2a 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -711,6 +711,73 @@ func TestIntegration(t *testing.T) { }) } + rollbackStrategyTestCases := []testCase{ + { + name: "all-at-once-rollback-expect-immediate-promotion", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithRollbackAllAtOnceStrategy(). + WithTargetTemplate("v1"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v2", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v2", true, true), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v2", 1), + ). + WithPreviouslyCurrentVersions("v1"). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1", true, false). + WithDeprecatedVersions( + testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, false, true), + ), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v2", 1), + ), + }, + { + name: "progressive-rollback-expect-ramp-at-first-step", + builder: testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithRollbackProgressiveStrategy(testhelpers.ProgressiveStep(50, time.Hour)). + WithTargetTemplate("v1"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v2", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v2", true, true), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v2", 1), + ). + WithPreviouslyCurrentVersions("v1"). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusRamping, 50, true, false). + WithCurrentVersion("v2", true, false), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v2", 1), + ), + }, + } + + for _, tc := range rollbackStrategyTestCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + testTemporalWorkerDeploymentCreation(ctx, t, k8sClient, mgr, ts, tc.builder.BuildWithValues(tc.name, testNamespace.Name, ts.GetDefaultNamespace())) + }) + } + // Create short TTL test Temporal server and client dcShortTTL := dynamicconfig.NewMemoryClient() // make versions eligible for deletion faster @@ -987,7 +1054,7 @@ func testTemporalWorkerDeploymentCreation( ExpectedDeploymentReplicas: tc.GetExpectedDeploymentReplicas(), } - makePreliminaryStatusTrue(ctx, t, env, twd) + makePreliminaryStatusTrue(ctx, t, env, twd, tc.GetPreviouslyCurrentImages()) // verify that temporal state matches the preliminary status, to confirm that makePreliminaryStatusTrue worked verifyTemporalStateMatchesStatusEventually(t, ctx, ts, twd, twd.Status, 30*time.Second, 5*time.Second) @@ -1007,6 +1074,17 @@ func testTemporalWorkerDeploymentCreation( t.Fatalf("failed to create TemporalWorkerDeployment: %v", err) } + // Immediately apply the input status before the controller's first reconcile. + // k8sClient.Create strips the status subresource, so without this the first reconcile + // always sees an empty status (CurrentVersion == nil), which triggers the fast-track. + // The controller reconcile is queued asynchronously via the watch/informer + // path, so this synchronous Status().Update() reliably precedes it. + if twd.Status.TargetVersion.BuildID != "" { + if err := k8sClient.Status().Update(ctx, twd); err != nil { + t.Fatalf("failed to pre-apply TWD status: %v", err) + } + } + // Hook: runs after TWD creation but before waiting for the target Deployment. // Use this to assert blocking behaviour and then unblock the rollout. if f := tc.GetPostTWDCreateFunc(); f != nil {