Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions api/v1alpha1/testdata/keda/scaledobject-crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: scaledobjects.keda.sh
spec:
group: keda.sh
names:
kind: ScaledObject
listKind: ScaledObjectList
plural: scaledobjects
singular: scaledobject
shortNames:
- so
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
subresources:
status: {}
4 changes: 3 additions & 1 deletion api/v1alpha1/webhook_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ var _ = BeforeSuite(func() {
CRDDirectoryPaths: []string{
// CRDs live in the crds chart's templates directory
filepath.Join("..", "..", "helm", "temporal-worker-controller-crds", "templates"),
// Stripped KEDA CRDs vendored under testdata for envtest integration tests
filepath.Join("testdata", "keda"),
},
ErrorIfCRDPathMissing: true,
WebhookInstallOptions: envtest.WebhookInstallOptions{
Expand Down Expand Up @@ -130,7 +132,7 @@ var _ = BeforeSuite(func() {
// ALLOWED_KINDS mirrors the default Helm values so integration tests can create HPAs.
Expect(os.Setenv("POD_NAMESPACE", "test-system")).To(Succeed())
Expect(os.Setenv("SERVICE_ACCOUNT_NAME", "test-controller")).To(Succeed())
Expect(os.Setenv("ALLOWED_KINDS", "HorizontalPodAutoscaler")).To(Succeed())
Expect(os.Setenv("ALLOWED_KINDS", "HorizontalPodAutoscaler,ScaledObject")).To(Succeed())

err = NewWorkerResourceTemplateValidator(mgr).SetupWebhookWithManager(mgr)
Expect(err).NotTo(HaveOccurred())
Expand Down
38 changes: 24 additions & 14 deletions api/v1alpha1/workerresourcetemplate_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ var ControllerOwnedMetricLabelKeys = []string{
"temporal_namespace",
}

// scaleToZeroRationale is the shared Temporal-side explanation used by both the HPA
// minReplicas=0 guard and the ScaledObject minReplicaCount/idleReplicaCount guard.
// Kept in one place so the explanation stays consistent across kinds.
const scaleToZeroRationale = "scaling Temporal workers to zero is not currently safe: " +
"Temporal's approximate_backlog_count metric stops being emitted when the task queue is idle " +
"with no pollers, so a metric-based autoscaler cannot detect a new backlog and scale back up " +
"from zero once all workers are gone"

// NewWorkerResourceTemplateValidator creates a validator from a manager.
//
// Three environment variables are read at startup (all injected by the Helm chart):
Expand Down Expand Up @@ -261,25 +269,27 @@ func validateWorkerResourceTemplateSpec(spec WorkerResourceTemplateSpec, allowed
if innerSpec, ok := obj["spec"].(map[string]interface{}); ok {
innerSpecPath := field.NewPath("spec").Child("template").Child("spec")

// 4. minReplicas must not be 0.
// Scaling to zero is not safe with Temporal's approximate_backlog_count metric: that metric
// is only emitted while the task queue is loaded in memory (i.e. while at least one worker
// is polling). If all workers are scaled to zero and the task queue goes idle for ~5 minutes,
// the metric stops being emitted and resets to zero. If new tasks then arrive but no worker
// polls, the metric remains zero — making it impossible for a metric-based autoscaler to
// detect the backlog and scale back up. Until Temporal makes this a reliable metric for
// scaling workers to zero and back, minReplicas=0 is rejected.
if minReplicas, exists := innerSpec["minReplicas"]; exists {
if v, ok := minReplicas.(float64); ok && v == 0 {
// 4. Scale-to-zero guard — kind-aware. Other kinds (e.g. PodDisruptionBudget)
// have no scale-to-zero concept and are intentionally skipped.
switch kind {
case "HorizontalPodAutoscaler":
if v, ok := innerSpec["minReplicas"].(float64); ok && v == 0 {
allErrs = append(allErrs, field.Invalid(
innerSpecPath.Child("minReplicas"),
0,
"minReplicas must not be 0; scaling Temporal workers to zero is not currently safe: "+
"Temporal's approximate_backlog_count metric stops being emitted when the task queue is idle "+
"with no pollers, so a metric-based autoscaler cannot detect a new backlog and scale back up "+
"from zero once all workers are gone",
"minReplicas must not be 0; "+scaleToZeroRationale,
))
}
case "ScaledObject":
for _, fieldName := range []string{"minReplicaCount", "idleReplicaCount"} {
if v, ok := innerSpec[fieldName].(float64); ok && v == 0 {
allErrs = append(allErrs, field.Invalid(
innerSpecPath.Child(fieldName),
0,
fieldName+" must not be 0; "+scaleToZeroRationale,
))
}
}
}

// 5. scaleTargetRef: if absent or empty ({}), the controller injects it to point at
Expand Down
59 changes: 59 additions & 0 deletions api/v1alpha1/workerresourcetemplate_webhook_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ func hpaObjForIntegration() map[string]interface{} {
}
}

// scaledObjectForIntegration returns a minimal valid ScaledObject embedded object spec,
// including a token in the PromQL query to confirm the webhook accepts templates
// that contain substitution tokens (substitution itself happens at controller render time).
func scaledObjectForIntegration() map[string]interface{} {
return map[string]interface{}{
"apiVersion": "keda.sh/v1alpha1",
"kind": "ScaledObject",
"spec": map[string]interface{}{
"scaleTargetRef": map[string]interface{}{},
"minReplicaCount": float64(1),
"maxReplicaCount": float64(5),
"triggers": []interface{}{
map[string]interface{}{
"type": "prometheus",
"metadata": map[string]interface{}{
"serverAddress": "http://prom:9090",
"threshold": "1",
"query": `sum(metric{build="__TEMPORAL_WORKER_BUILD_ID__"})`,
},
},
},
},
}
}

// makeWRTForWebhook builds a WorkerResourceTemplate in the given namespace.
func makeWRTForWebhook(name, ns, workerDeploymentRef string, embeddedObj map[string]interface{}) *WorkerResourceTemplate {
raw, _ := json.Marshal(embeddedObj)
Expand Down Expand Up @@ -93,6 +118,31 @@ func grantControllerSAHPACreateAccess(ns string) {
Expect(k8sClient.Create(ctx, rb)).To(Succeed())
}

// grantControllerSAScaledObjectCreateAccess creates a Role granting the controller SA
// (system:serviceaccount:test-system:test-controller) create access to keda.sh/scaledobjects
// in ns. Mirrors grantControllerSAHPACreateAccess.
func grantControllerSAScaledObjectCreateAccess(ns string) {
role := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "sa-so-creator", Namespace: ns},
Rules: []rbacv1.PolicyRule{
{
APIGroups: []string{"keda.sh"},
Resources: []string{"scaledobjects"},
Verbs: []string{"create", "update", "delete"},
},
},
}
Expect(k8sClient.Create(ctx, role)).To(Succeed())
rb := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: "sa-so-creator-rb", Namespace: ns},
RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "Role", Name: "sa-so-creator"},
Subjects: []rbacv1.Subject{
{Kind: "ServiceAccount", Name: "test-controller", Namespace: "test-system"},
},
}
Expect(k8sClient.Create(ctx, rb)).To(Succeed())
}

// grantUserWRTCreateAccess grants a user permission to create WRTs in ns.
// This is required so the kube-apiserver's RBAC check passes before calling the webhook.
func grantUserWRTCreateAccess(ns, username string) {
Expand Down Expand Up @@ -194,6 +244,15 @@ var _ = Describe("WorkerResourceTemplate webhook integration", func() {
Expect(err.Error()).To(ContainSubstring("not authorized"))
})

It("allows creation of a ScaledObject-backed WRT when controller SA has keda.sh permission", func() {
ns := makeTestNamespace("wh-so-happy")
grantControllerSAScaledObjectCreateAccess(ns)

wrt := makeWRTForWebhook("t-so-happy", ns, "my-worker", scaledObjectForIntegration())
Expect(k8sClient.Create(ctx, wrt)).To(Succeed(),
"admission must succeed when controller SA can create ScaledObjects")
})

// temporalWorkerDeploymentRef.name immutability enforced via a real HTTP update request.
// First create a valid WRT, then attempt to change temporalWorkerDeploymentRef.name via k8sClient.Update.
// The webhook must reject the update with a message about immutability.
Expand Down
76 changes: 75 additions & 1 deletion api/v1alpha1/workerresourcetemplate_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ func validHPAObject() map[string]interface{} {
}
}

// validScaledObjectObject returns a minimal valid ScaledObject embedded object spec.
func validScaledObjectObject() map[string]interface{} {
return map[string]interface{}{
"apiVersion": "keda.sh/v1alpha1",
"kind": "ScaledObject",
"spec": map[string]interface{}{
"minReplicaCount": float64(1),
"maxReplicaCount": float64(10),
"triggers": []interface{}{
map[string]interface{}{
"type": "prometheus",
"metadata": map[string]interface{}{"query": "1", "threshold": "1", "serverAddress": "http://prom"},
},
},
},
}
}

// namespacedScope implements meta.RESTScope for namespace-scoped resources.
type namespacedScope struct{}

Expand Down Expand Up @@ -72,8 +90,9 @@ func newValidatorNoAPI() *temporaliov1alpha1.WorkerResourceTemplateValidator {
RESTMapper: newFakeRESTMapper(
schema.GroupVersionKind{Group: "autoscaling", Version: "v2", Kind: "HorizontalPodAutoscaler"},
schema.GroupVersionKind{Group: "policy", Version: "v1", Kind: "PodDisruptionBudget"},
schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"},
),
AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget"},
AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget", "ScaledObject"},
}
}

Expand Down Expand Up @@ -594,3 +613,58 @@ func TestWorkerResourceTemplate_MultipleErrors(t *testing.T) {
assert.Contains(t, err.Error(), "metadata.name is generated by the controller")
assert.Contains(t, err.Error(), "minReplicas must not be 0")
}

func TestWorkerResourceTemplate_ValidateScaledObject(t *testing.T) {
v := newValidatorNoAPI()

t.Run("valid ScaledObject is accepted", func(t *testing.T) {
wrt := newWRT("so", "mywd", validScaledObjectObject())
_, err := v.ValidateCreate(context.Background(), wrt)
assert.NoError(t, err)
})

t.Run("minReplicaCount=0 is rejected with shared rationale", func(t *testing.T) {
obj := validScaledObjectObject()
obj["spec"].(map[string]interface{})["minReplicaCount"] = float64(0)
wrt := newWRT("so", "mywd", obj)
_, err := v.ValidateCreate(context.Background(), wrt)
require.Error(t, err)
assert.Contains(t, err.Error(), "minReplicaCount must not be 0")
assert.Contains(t, err.Error(), "approximate_backlog_count metric stops being emitted")
})

t.Run("idleReplicaCount=0 is rejected with shared rationale", func(t *testing.T) {
obj := validScaledObjectObject()
obj["spec"].(map[string]interface{})["idleReplicaCount"] = float64(0)
wrt := newWRT("so", "mywd", obj)
_, err := v.ValidateCreate(context.Background(), wrt)
require.Error(t, err)
assert.Contains(t, err.Error(), "idleReplicaCount must not be 0")
assert.Contains(t, err.Error(), "approximate_backlog_count metric stops being emitted")
})

t.Run("non-empty scaleTargetRef is rejected (reuses existing check)", func(t *testing.T) {
obj := validScaledObjectObject()
obj["spec"].(map[string]interface{})["scaleTargetRef"] = map[string]interface{}{
"name": "user-set-name",
}
wrt := newWRT("so", "mywd", obj)
_, err := v.ValidateCreate(context.Background(), wrt)
require.Error(t, err)
assert.Contains(t, err.Error(), "scaleTargetRef")
})

t.Run("ScaledObject not in AllowedKinds is rejected", func(t *testing.T) {
restricted := &temporaliov1alpha1.WorkerResourceTemplateValidator{
Client: fake.NewClientBuilder().Build(),
RESTMapper: newFakeRESTMapper(
schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"},
),
AllowedKinds: []string{"HorizontalPodAutoscaler"},
}
wrt := newWRT("so", "mywd", validScaledObjectObject())
_, err := restricted.ValidateCreate(context.Background(), wrt)
require.Error(t, err)
assert.Contains(t, err.Error(), "not in the allowed list")
})
}
Loading