diff --git a/api/v1alpha1/testdata/keda/scaledobject-crd.yaml b/api/v1alpha1/testdata/keda/scaledobject-crd.yaml new file mode 100644 index 00000000..c611164a --- /dev/null +++ b/api/v1alpha1/testdata/keda/scaledobject-crd.yaml @@ -0,0 +1,24 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: scaledobjects.keda.sh +spec: + group: keda.sh + names: + kind: ScaledObject + listKind: ScaledObjectList + plural: scaledobjects + singular: scaledobject + shortNames: + - so + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + x-kubernetes-preserve-unknown-fields: true + subresources: + status: {} diff --git a/api/v1alpha1/webhook_suite_test.go b/api/v1alpha1/webhook_suite_test.go index bce3c926..c9364a00 100644 --- a/api/v1alpha1/webhook_suite_test.go +++ b/api/v1alpha1/webhook_suite_test.go @@ -68,6 +68,8 @@ var _ = BeforeSuite(func() { CRDDirectoryPaths: []string{ // CRDs live in the crds chart's templates directory filepath.Join("..", "..", "helm", "temporal-worker-controller-crds", "templates"), + // Stripped KEDA CRDs vendored under testdata for envtest integration tests + filepath.Join("testdata", "keda"), }, ErrorIfCRDPathMissing: true, WebhookInstallOptions: envtest.WebhookInstallOptions{ @@ -130,7 +132,7 @@ var _ = BeforeSuite(func() { // ALLOWED_KINDS mirrors the default Helm values so integration tests can create HPAs. Expect(os.Setenv("POD_NAMESPACE", "test-system")).To(Succeed()) Expect(os.Setenv("SERVICE_ACCOUNT_NAME", "test-controller")).To(Succeed()) - Expect(os.Setenv("ALLOWED_KINDS", "HorizontalPodAutoscaler")).To(Succeed()) + Expect(os.Setenv("ALLOWED_KINDS", "HorizontalPodAutoscaler,ScaledObject")).To(Succeed()) err = NewWorkerResourceTemplateValidator(mgr).SetupWebhookWithManager(mgr) Expect(err).NotTo(HaveOccurred()) diff --git a/api/v1alpha1/workerresourcetemplate_webhook.go b/api/v1alpha1/workerresourcetemplate_webhook.go index 5475a4a4..6ac8244a 100644 --- a/api/v1alpha1/workerresourcetemplate_webhook.go +++ b/api/v1alpha1/workerresourcetemplate_webhook.go @@ -46,6 +46,14 @@ var ControllerOwnedMetricLabelKeys = []string{ "temporal_namespace", } +// scaleToZeroRationale is the shared Temporal-side explanation used by both the HPA +// minReplicas=0 guard and the ScaledObject minReplicaCount/idleReplicaCount guard. +// Kept in one place so the explanation stays consistent across kinds. +const scaleToZeroRationale = "scaling Temporal workers to zero is not currently safe: " + + "Temporal's approximate_backlog_count metric stops being emitted when the task queue is idle " + + "with no pollers, so a metric-based autoscaler cannot detect a new backlog and scale back up " + + "from zero once all workers are gone" + // NewWorkerResourceTemplateValidator creates a validator from a manager. // // Three environment variables are read at startup (all injected by the Helm chart): @@ -261,25 +269,27 @@ func validateWorkerResourceTemplateSpec(spec WorkerResourceTemplateSpec, allowed if innerSpec, ok := obj["spec"].(map[string]interface{}); ok { innerSpecPath := field.NewPath("spec").Child("template").Child("spec") - // 4. minReplicas must not be 0. - // Scaling to zero is not safe with Temporal's approximate_backlog_count metric: that metric - // is only emitted while the task queue is loaded in memory (i.e. while at least one worker - // is polling). If all workers are scaled to zero and the task queue goes idle for ~5 minutes, - // the metric stops being emitted and resets to zero. If new tasks then arrive but no worker - // polls, the metric remains zero — making it impossible for a metric-based autoscaler to - // detect the backlog and scale back up. Until Temporal makes this a reliable metric for - // scaling workers to zero and back, minReplicas=0 is rejected. - if minReplicas, exists := innerSpec["minReplicas"]; exists { - if v, ok := minReplicas.(float64); ok && v == 0 { + // 4. Scale-to-zero guard — kind-aware. Other kinds (e.g. PodDisruptionBudget) + // have no scale-to-zero concept and are intentionally skipped. + switch kind { + case "HorizontalPodAutoscaler": + if v, ok := innerSpec["minReplicas"].(float64); ok && v == 0 { allErrs = append(allErrs, field.Invalid( innerSpecPath.Child("minReplicas"), 0, - "minReplicas must not be 0; scaling Temporal workers to zero is not currently safe: "+ - "Temporal's approximate_backlog_count metric stops being emitted when the task queue is idle "+ - "with no pollers, so a metric-based autoscaler cannot detect a new backlog and scale back up "+ - "from zero once all workers are gone", + "minReplicas must not be 0; "+scaleToZeroRationale, )) } + case "ScaledObject": + for _, fieldName := range []string{"minReplicaCount", "idleReplicaCount"} { + if v, ok := innerSpec[fieldName].(float64); ok && v == 0 { + allErrs = append(allErrs, field.Invalid( + innerSpecPath.Child(fieldName), + 0, + fieldName+" must not be 0; "+scaleToZeroRationale, + )) + } + } } // 5. scaleTargetRef: if absent or empty ({}), the controller injects it to point at diff --git a/api/v1alpha1/workerresourcetemplate_webhook_integration_test.go b/api/v1alpha1/workerresourcetemplate_webhook_integration_test.go index 662c7479..c6999cde 100644 --- a/api/v1alpha1/workerresourcetemplate_webhook_integration_test.go +++ b/api/v1alpha1/workerresourcetemplate_webhook_integration_test.go @@ -42,6 +42,31 @@ func hpaObjForIntegration() map[string]interface{} { } } +// scaledObjectForIntegration returns a minimal valid ScaledObject embedded object spec, +// including a token in the PromQL query to confirm the webhook accepts templates +// that contain substitution tokens (substitution itself happens at controller render time). +func scaledObjectForIntegration() map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "keda.sh/v1alpha1", + "kind": "ScaledObject", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, + "minReplicaCount": float64(1), + "maxReplicaCount": float64(5), + "triggers": []interface{}{ + map[string]interface{}{ + "type": "prometheus", + "metadata": map[string]interface{}{ + "serverAddress": "http://prom:9090", + "threshold": "1", + "query": `sum(metric{build="__TEMPORAL_WORKER_BUILD_ID__"})`, + }, + }, + }, + }, + } +} + // makeWRTForWebhook builds a WorkerResourceTemplate in the given namespace. func makeWRTForWebhook(name, ns, workerDeploymentRef string, embeddedObj map[string]interface{}) *WorkerResourceTemplate { raw, _ := json.Marshal(embeddedObj) @@ -93,6 +118,31 @@ func grantControllerSAHPACreateAccess(ns string) { Expect(k8sClient.Create(ctx, rb)).To(Succeed()) } +// grantControllerSAScaledObjectCreateAccess creates a Role granting the controller SA +// (system:serviceaccount:test-system:test-controller) create access to keda.sh/scaledobjects +// in ns. Mirrors grantControllerSAHPACreateAccess. +func grantControllerSAScaledObjectCreateAccess(ns string) { + role := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{Name: "sa-so-creator", Namespace: ns}, + Rules: []rbacv1.PolicyRule{ + { + APIGroups: []string{"keda.sh"}, + Resources: []string{"scaledobjects"}, + Verbs: []string{"create", "update", "delete"}, + }, + }, + } + Expect(k8sClient.Create(ctx, role)).To(Succeed()) + rb := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "sa-so-creator-rb", Namespace: ns}, + RoleRef: rbacv1.RoleRef{APIGroup: "rbac.authorization.k8s.io", Kind: "Role", Name: "sa-so-creator"}, + Subjects: []rbacv1.Subject{ + {Kind: "ServiceAccount", Name: "test-controller", Namespace: "test-system"}, + }, + } + Expect(k8sClient.Create(ctx, rb)).To(Succeed()) +} + // grantUserWRTCreateAccess grants a user permission to create WRTs in ns. // This is required so the kube-apiserver's RBAC check passes before calling the webhook. func grantUserWRTCreateAccess(ns, username string) { @@ -194,6 +244,15 @@ var _ = Describe("WorkerResourceTemplate webhook integration", func() { Expect(err.Error()).To(ContainSubstring("not authorized")) }) + It("allows creation of a ScaledObject-backed WRT when controller SA has keda.sh permission", func() { + ns := makeTestNamespace("wh-so-happy") + grantControllerSAScaledObjectCreateAccess(ns) + + wrt := makeWRTForWebhook("t-so-happy", ns, "my-worker", scaledObjectForIntegration()) + Expect(k8sClient.Create(ctx, wrt)).To(Succeed(), + "admission must succeed when controller SA can create ScaledObjects") + }) + // temporalWorkerDeploymentRef.name immutability enforced via a real HTTP update request. // First create a valid WRT, then attempt to change temporalWorkerDeploymentRef.name via k8sClient.Update. // The webhook must reject the update with a message about immutability. diff --git a/api/v1alpha1/workerresourcetemplate_webhook_test.go b/api/v1alpha1/workerresourcetemplate_webhook_test.go index 75aff411..d823d2d9 100644 --- a/api/v1alpha1/workerresourcetemplate_webhook_test.go +++ b/api/v1alpha1/workerresourcetemplate_webhook_test.go @@ -45,6 +45,24 @@ func validHPAObject() map[string]interface{} { } } +// validScaledObjectObject returns a minimal valid ScaledObject embedded object spec. +func validScaledObjectObject() map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "keda.sh/v1alpha1", + "kind": "ScaledObject", + "spec": map[string]interface{}{ + "minReplicaCount": float64(1), + "maxReplicaCount": float64(10), + "triggers": []interface{}{ + map[string]interface{}{ + "type": "prometheus", + "metadata": map[string]interface{}{"query": "1", "threshold": "1", "serverAddress": "http://prom"}, + }, + }, + }, + } +} + // namespacedScope implements meta.RESTScope for namespace-scoped resources. type namespacedScope struct{} @@ -72,8 +90,9 @@ func newValidatorNoAPI() *temporaliov1alpha1.WorkerResourceTemplateValidator { RESTMapper: newFakeRESTMapper( schema.GroupVersionKind{Group: "autoscaling", Version: "v2", Kind: "HorizontalPodAutoscaler"}, schema.GroupVersionKind{Group: "policy", Version: "v1", Kind: "PodDisruptionBudget"}, + schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"}, ), - AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget"}, + AllowedKinds: []string{"HorizontalPodAutoscaler", "PodDisruptionBudget", "ScaledObject"}, } } @@ -594,3 +613,58 @@ func TestWorkerResourceTemplate_MultipleErrors(t *testing.T) { assert.Contains(t, err.Error(), "metadata.name is generated by the controller") assert.Contains(t, err.Error(), "minReplicas must not be 0") } + +func TestWorkerResourceTemplate_ValidateScaledObject(t *testing.T) { + v := newValidatorNoAPI() + + t.Run("valid ScaledObject is accepted", func(t *testing.T) { + wrt := newWRT("so", "mywd", validScaledObjectObject()) + _, err := v.ValidateCreate(context.Background(), wrt) + assert.NoError(t, err) + }) + + t.Run("minReplicaCount=0 is rejected with shared rationale", func(t *testing.T) { + obj := validScaledObjectObject() + obj["spec"].(map[string]interface{})["minReplicaCount"] = float64(0) + wrt := newWRT("so", "mywd", obj) + _, err := v.ValidateCreate(context.Background(), wrt) + require.Error(t, err) + assert.Contains(t, err.Error(), "minReplicaCount must not be 0") + assert.Contains(t, err.Error(), "approximate_backlog_count metric stops being emitted") + }) + + t.Run("idleReplicaCount=0 is rejected with shared rationale", func(t *testing.T) { + obj := validScaledObjectObject() + obj["spec"].(map[string]interface{})["idleReplicaCount"] = float64(0) + wrt := newWRT("so", "mywd", obj) + _, err := v.ValidateCreate(context.Background(), wrt) + require.Error(t, err) + assert.Contains(t, err.Error(), "idleReplicaCount must not be 0") + assert.Contains(t, err.Error(), "approximate_backlog_count metric stops being emitted") + }) + + t.Run("non-empty scaleTargetRef is rejected (reuses existing check)", func(t *testing.T) { + obj := validScaledObjectObject() + obj["spec"].(map[string]interface{})["scaleTargetRef"] = map[string]interface{}{ + "name": "user-set-name", + } + wrt := newWRT("so", "mywd", obj) + _, err := v.ValidateCreate(context.Background(), wrt) + require.Error(t, err) + assert.Contains(t, err.Error(), "scaleTargetRef") + }) + + t.Run("ScaledObject not in AllowedKinds is rejected", func(t *testing.T) { + restricted := &temporaliov1alpha1.WorkerResourceTemplateValidator{ + Client: fake.NewClientBuilder().Build(), + RESTMapper: newFakeRESTMapper( + schema.GroupVersionKind{Group: "keda.sh", Version: "v1alpha1", Kind: "ScaledObject"}, + ), + AllowedKinds: []string{"HorizontalPodAutoscaler"}, + } + wrt := newWRT("so", "mywd", validScaledObjectObject()) + _, err := restricted.ValidateCreate(context.Background(), wrt) + require.Error(t, err) + assert.Contains(t, err.Error(), "not in the allowed list") + }) +} diff --git a/docs/worker-resource-templates.md b/docs/worker-resource-templates.md index c6e9de8c..c364e6a1 100644 --- a/docs/worker-resource-templates.md +++ b/docs/worker-resource-templates.md @@ -38,6 +38,20 @@ The controller auto-injects two fields when you set them to `{}` (empty object) The webhook rejects any template that hardcodes `temporal_worker_deployment_name`, `temporal_worker_build_id`, or `temporal_namespace` in a metric selector — these are always controller-owned. +## Token substitution + +For CRDs that don't have a structured `matchLabels` field for per-version scoping — e.g. KEDA's `ScaledObject` Prometheus trigger (freeform PromQL `query`) or its native `type: temporal` trigger (freeform `buildId`/`namespace` metadata) — the controller performs token substitution over every string-valued leaf in `spec.template` at render time. The three tokens are: + +| Token | Substituted value | +|-------|-------------------| +| `__TEMPORAL_WORKER_DEPLOYMENT_NAME__` | `_` | +| `__TEMPORAL_WORKER_BUILD_ID__` | the active Build ID | +| `__TEMPORAL_NAMESPACE__` | the Temporal namespace | + +Tokens are opt-in: strings without any token are untouched. Unknown `__FOO__`-style tokens pass through unchanged — only the three tokens above are recognised. + +See [examples/wrt-keda-prometheus.yaml](../examples/wrt-keda-prometheus.yaml) and [examples/wrt-keda-temporal.yaml](../examples/wrt-keda-temporal.yaml) for full KEDA examples using each trigger type. + ## Resource naming Each per-Build-ID copy is given a unique, DNS-safe name derived from the `(twdName, wrtName, buildID)` triple. Names are capped at 47 characters to be safe for all Kubernetes resource types, including Deployment (which has pod-naming constraints that effectively limit Deployment names to ~47 characters). The name always ends with an 8-character hash of the full triple, so uniqueness is guaranteed even when the human-readable prefix is truncated. @@ -71,6 +85,21 @@ workerResourceTemplate: resources: ["poddisruptionbudgets"] ``` +To also allow KEDA ScaledObjects, add an entry with the `keda.sh` API group: + +```yaml +workerResourceTemplate: + allowedResources: + - kinds: ["HorizontalPodAutoscaler"] + apiGroups: ["autoscaling"] + resources: ["horizontalpodautoscalers"] + - kinds: ["ScaledObject"] + apiGroups: ["keda.sh"] + resources: ["scaledobjects"] +``` + +Requires KEDA CRDs installed in the cluster. Users who create `WorkerResourceTemplate`s with `ScaledObject` need RBAC permission to manage `scaledobjects.keda.sh` in their namespace directly — the webhook's SubjectAccessReview rejects the request otherwise. + Each entry has three fields: - `kinds` — kind names the webhook accepts (case-insensitive) - `apiGroups` — API groups used to generate the controller's RBAC rules @@ -162,6 +191,92 @@ spec: matchLabels: {} ``` +## Example: KEDA ScaledObject per worker version + +For clusters where KEDA owns the external-metrics APIService, HPAs with `type: External` cannot resolve against other sources. Produce KEDA `ScaledObject`s directly. + +KEDA offers two complementary triggers for Temporal autoscaling: + +- **`type: prometheus`** — scale on a PromQL query against whatever Prometheus-compatible store is scraping your Temporal metrics (Cloud monitoring, prometheus-adapter, etc.). Works well when your metrics pipeline already carries `temporal_worker_build_id` as a label. +- **`type: temporal`** (KEDA ≥ 2.17) — scale directly on Temporal's own task queue backlog via gRPC. Bypasses the Prometheus layer entirely; native per-`buildId` scoping. Prefer this when you don't want autoscaling to depend on your metrics pipeline, or when your metrics don't carry the per-version labels. + +Scale-to-zero (`minReplicaCount: 0` or `idleReplicaCount: 0`) is rejected by the webhook for both triggers — same Temporal-side reason as HPA `minReplicas: 0`: `approximate_backlog_count` is not emitted when the task queue is idle with no pollers, so the autoscaler cannot detect new work from a cold start. + +Both variants require KEDA installed in the cluster and `ScaledObject` added to `workerResourceTemplate.allowedResources` (see next section). + +### Using the Prometheus trigger + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: WorkerResourceTemplate +metadata: + name: my-worker-keda + namespace: my-namespace +spec: + temporalWorkerDeploymentRef: + name: my-worker + template: + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + spec: + # {} tells the controller to inject the versioned Deployment reference. + # Do not set this to a real value — the webhook will reject it. + scaleTargetRef: {} + minReplicaCount: 1 + maxReplicaCount: 10 + triggers: + - type: prometheus + metadata: + serverAddress: http://prometheus.monitoring.svc:9090 + threshold: "1" + query: | + sum(temporal_backlog_count_by_version{ + temporal_worker_deployment_name="__TEMPORAL_WORKER_DEPLOYMENT_NAME__", + temporal_worker_build_id="__TEMPORAL_WORKER_BUILD_ID__", + temporal_namespace="__TEMPORAL_NAMESPACE__" + }) +``` + +### Using KEDA's native Temporal scaler + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: WorkerResourceTemplate +metadata: + name: my-worker-keda-temporal + namespace: my-namespace +spec: + temporalWorkerDeploymentRef: + name: my-worker + template: + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + spec: + scaleTargetRef: {} + minReplicaCount: 1 + maxReplicaCount: 10 + triggers: + - type: temporal + metadata: + # Temporal gRPC endpoint — .tmprl.cloud:7233 for Temporal Cloud, + # or your self-hosted frontend service address. + endpoint: my-namespace.xxxxx.tmprl.cloud:7233 + namespace: __TEMPORAL_NAMESPACE__ + # The task queue this worker polls. Not templated — one WRT per queue. + taskQueue: my-task-queue + # Per-version backlog scoping via the controller-substituted token. + buildId: __TEMPORAL_WORKER_BUILD_ID__ + targetQueueSize: "10" + queueTypes: "workflow,activity" + authenticationRef: + # A KEDA TriggerAuthentication in the same namespace, providing + # Temporal credentials (mTLS cert/key or Cloud API key). See + # https://keda.sh/docs/latest/scalers/temporal/ for the spec. + name: temporal-cloud-auth +``` + +Note: the native Temporal scaler only needs `__TEMPORAL_NAMESPACE__` and `__TEMPORAL_WORKER_BUILD_ID__` — per-version scoping goes through the `buildId` metadata field, not through a metric label. `__TEMPORAL_WORKER_DEPLOYMENT_NAME__` is still substituted if present but typically unused here. + ## Checking status ```bash diff --git a/examples/wrt-keda-prometheus.yaml b/examples/wrt-keda-prometheus.yaml new file mode 100644 index 00000000..1a2489ea --- /dev/null +++ b/examples/wrt-keda-prometheus.yaml @@ -0,0 +1,60 @@ +# WorkerResourceTemplate — per-version KEDA ScaledObject (Prometheus trigger) +# +# For clusters where KEDA owns the external.metrics.k8s.io APIService (and so an HPA +# with type: External cannot be used). The controller creates one ScaledObject per +# active Build ID, pointing at the versioned Deployment; KEDA then drives per-version +# HPAs under the hood. +# +# Prerequisites: +# 1. KEDA installed in the cluster (>= 2.10 recommended). +# 2. Add ScaledObject to the controller's allowedResources in Helm values: +# workerResourceTemplate: +# allowedResources: +# - kinds: ["HorizontalPodAutoscaler"] +# apiGroups: ["autoscaling"] +# resources: ["horizontalpodautoscalers"] +# - kinds: ["ScaledObject"] +# apiGroups: ["keda.sh"] +# resources: ["scaledobjects"] +# 3. Prometheus reachable at the serverAddress below, scraping Temporal metrics +# with the usual versioned labels (temporal_worker_deployment_name, +# temporal_worker_build_id, temporal_namespace). +# +# Token substitution: +# The controller replaces these tokens with per-version values at render time: +# __TEMPORAL_WORKER_DEPLOYMENT_NAME__ → "_" +# __TEMPORAL_WORKER_BUILD_ID__ → the active Build ID +# __TEMPORAL_NAMESPACE__ → the Temporal namespace +# +# Apply: +# kubectl apply -f examples/wrt-keda-prometheus.yaml +apiVersion: temporal.io/v1alpha1 +kind: WorkerResourceTemplate +metadata: + name: helloworld-keda + namespace: default +spec: + temporalWorkerDeploymentRef: + name: helloworld + template: + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + spec: + # {} tells the controller to inject the versioned Deployment reference. + scaleTargetRef: {} + minReplicaCount: 1 + maxReplicaCount: 30 + pollingInterval: 15 + cooldownPeriod: 120 + triggers: + - type: prometheus + metadata: + serverAddress: http://prometheus.monitoring.svc:9090 + threshold: "1" + query: | + sum(temporal_backlog_count_by_version{ + temporal_worker_deployment_name="__TEMPORAL_WORKER_DEPLOYMENT_NAME__", + temporal_worker_build_id="__TEMPORAL_WORKER_BUILD_ID__", + temporal_namespace="__TEMPORAL_NAMESPACE__", + task_type="Activity" + }) diff --git a/examples/wrt-keda-temporal.yaml b/examples/wrt-keda-temporal.yaml new file mode 100644 index 00000000..27c7274e --- /dev/null +++ b/examples/wrt-keda-temporal.yaml @@ -0,0 +1,94 @@ +# WorkerResourceTemplate — per-version KEDA ScaledObject (native Temporal scaler) +# +# Uses KEDA's built-in `type: temporal` trigger (KEDA >= 2.17), which queries +# Temporal's gRPC API directly for per-version task queue backlog. Avoids the +# Prometheus layer entirely — no metric pipeline, no scrape-interval lag, and +# native understanding of Worker Deployments versioning via the `buildId` field. +# +# When to prefer this over the Prometheus trigger: +# - Your metrics pipeline doesn't reliably emit `temporal_worker_build_id` +# as a label (many pipelines drop or rename it). +# - You want a shorter scale-up signal path (no scrape → TSDB → eval loop). +# - You don't want worker autoscaling coupled to your metrics stack. +# +# Prerequisites: +# 1. KEDA >= 2.17 installed in the cluster. +# 2. Add ScaledObject to the controller's allowedResources in Helm values: +# workerResourceTemplate: +# allowedResources: +# - kinds: ["HorizontalPodAutoscaler"] +# apiGroups: ["autoscaling"] +# resources: ["horizontalpodautoscalers"] +# - kinds: ["ScaledObject"] +# apiGroups: ["keda.sh"] +# resources: ["scaledobjects"] +# 3. A KEDA TriggerAuthentication (or ClusterTriggerAuthentication) in the +# same namespace providing Temporal credentials — mTLS cert/key for +# self-hosted, or an API key for Temporal Cloud. See +# https://keda.sh/docs/latest/scalers/temporal/#authentication-parameters +# for the full spec. A minimal mTLS example: +# +# apiVersion: keda.sh/v1alpha1 +# kind: TriggerAuthentication +# metadata: +# name: temporal-cloud-mtls-auth +# namespace: default +# spec: +# secretTargetRef: +# - parameter: cert +# name: temporal-cloud-mtls-tls +# key: tls.crt +# - parameter: key +# name: temporal-cloud-mtls-tls +# key: tls.key +# +# Token substitution (controller-side at render time): +# __TEMPORAL_NAMESPACE__ → the Temporal namespace +# __TEMPORAL_WORKER_BUILD_ID__ → the active Build ID +# +# The __TEMPORAL_WORKER_DEPLOYMENT_NAME__ token is still available but not +# needed here — per-version scoping is done via the `buildId` metadata field, +# not via a metric label. +# +# Apply: +# kubectl apply -f examples/wrt-keda-temporal.yaml +apiVersion: temporal.io/v1alpha1 +kind: WorkerResourceTemplate +metadata: + name: helloworld-keda-temporal + namespace: default +spec: + temporalWorkerDeploymentRef: + name: helloworld + template: + apiVersion: keda.sh/v1alpha1 + kind: ScaledObject + spec: + # {} tells the controller to inject the versioned Deployment reference. + scaleTargetRef: {} + minReplicaCount: 1 + maxReplicaCount: 30 + pollingInterval: 15 + cooldownPeriod: 120 + triggers: + - type: temporal + metadata: + # For Temporal Cloud, the endpoint is .tmprl.cloud:7233. + # For self-hosted, point at your frontend service. + endpoint: helloworld-namespace.xxxxx.tmprl.cloud:7233 + namespace: __TEMPORAL_NAMESPACE__ + taskQueue: helloworld-task-queue + buildId: __TEMPORAL_WORKER_BUILD_ID__ + # Scale up when more than 10 queued tasks are waiting per replica. + targetQueueSize: "10" + # Scale on both workflow and activity task queues. + queueTypes: "workflow,activity" + authenticationRef: + name: temporal-cloud-mtls-auth + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 300 + scaleUp: + stabilizationWindowSeconds: 30 diff --git a/helm/temporal-worker-controller/values.yaml b/helm/temporal-worker-controller/values.yaml index 2b8a825d..aac9d798 100644 --- a/helm/temporal-worker-controller/values.yaml +++ b/helm/temporal-worker-controller/values.yaml @@ -101,6 +101,12 @@ workerResourceTemplate: apiGroups: ["autoscaling"] resources: ["horizontalpodautoscalers"] + # Uncomment to allow KEDA ScaledObjects as WorkerResourceTemplate objects. + # Requires KEDA installed in the cluster. + # - kinds: ["ScaledObject"] + # apiGroups: ["keda.sh"] + # resources: ["scaledobjects"] + certmanager: # install controls whether cert-manager is installed as a Helm subchart. # Set to true if cert-manager is not already installed in the cluster. diff --git a/internal/k8s/tokens.go b/internal/k8s/tokens.go new file mode 100644 index 00000000..fb28db2e --- /dev/null +++ b/internal/k8s/tokens.go @@ -0,0 +1,51 @@ +package k8s + +import "strings" + +// Token names are public API. Changing them is a breaking change. +const ( + TokenTemporalWorkerDeploymentName = "__TEMPORAL_WORKER_DEPLOYMENT_NAME__" + TokenTemporalWorkerBuildID = "__TEMPORAL_WORKER_BUILD_ID__" + TokenTemporalNamespace = "__TEMPORAL_NAMESPACE__" +) + +// BuildTemporalTokens returns the token-to-value map used by SubstituteTemporalTokens. +// The values mirror the matchLabels injection values 1:1 so that both mechanisms +// target the same Prometheus series when used in the same template. +func BuildTemporalTokens(twdNamespace, twdName, buildID, temporalNamespace string) map[string]string { + return map[string]string{ + TokenTemporalWorkerDeploymentName: twdNamespace + "_" + twdName, + TokenTemporalWorkerBuildID: buildID, + TokenTemporalNamespace: temporalNamespace, + } +} + +// SubstituteTemporalTokens walks obj recursively and replaces every recognised +// token occurrence inside every string leaf. Non-string leaves are untouched. +// Unknown __FOO__-style tokens pass through unchanged — the function only +// substitutes tokens present in the map. +func SubstituteTemporalTokens(obj map[string]interface{}, tokens map[string]string) { + for k, v := range obj { + obj[k] = substituteTokensInValue(v, tokens) + } +} + +func substituteTokensInValue(v interface{}, tokens map[string]string) interface{} { + switch vv := v.(type) { + case string: + for token, replacement := range tokens { + vv = strings.ReplaceAll(vv, token, replacement) + } + return vv + case map[string]interface{}: + SubstituteTemporalTokens(vv, tokens) + return vv + case []interface{}: + for i, item := range vv { + vv[i] = substituteTokensInValue(item, tokens) + } + return vv + default: + return v + } +} diff --git a/internal/k8s/tokens_test.go b/internal/k8s/tokens_test.go new file mode 100644 index 00000000..6303733b --- /dev/null +++ b/internal/k8s/tokens_test.go @@ -0,0 +1,97 @@ +package k8s + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuildTemporalTokens(t *testing.T) { + got := BuildTemporalTokens("ns", "mywd", "buildX", "tns") + assert.Equal(t, "ns_mywd", got["__TEMPORAL_WORKER_DEPLOYMENT_NAME__"]) + assert.Equal(t, "buildX", got["__TEMPORAL_WORKER_BUILD_ID__"]) + assert.Equal(t, "tns", got["__TEMPORAL_NAMESPACE__"]) +} + +func TestSubstituteTemporalTokens(t *testing.T) { + tokens := BuildTemporalTokens("ns", "mywd", "buildX", "tns") + + t.Run("replaces all three tokens in a single string", func(t *testing.T) { + obj := map[string]interface{}{ + "q": `x{dn="__TEMPORAL_WORKER_DEPLOYMENT_NAME__",b="__TEMPORAL_WORKER_BUILD_ID__",n="__TEMPORAL_NAMESPACE__"}`, + } + SubstituteTemporalTokens(obj, tokens) + assert.Equal(t, `x{dn="ns_mywd",b="buildX",n="tns"}`, obj["q"]) + }) + + t.Run("no-op when no tokens are present", func(t *testing.T) { + obj := map[string]interface{}{"q": "no tokens here"} + SubstituteTemporalTokens(obj, tokens) + assert.Equal(t, "no tokens here", obj["q"]) + }) + + t.Run("unknown __FOO__ tokens pass through untouched", func(t *testing.T) { + obj := map[string]interface{}{"q": "__UNKNOWN_TOKEN__ and __TEMPORAL_WORKER_BUILD_ID__"} + SubstituteTemporalTokens(obj, tokens) + assert.Equal(t, "__UNKNOWN_TOKEN__ and buildX", obj["q"]) + }) + + t.Run("walks nested maps", func(t *testing.T) { + obj := map[string]interface{}{ + "spec": map[string]interface{}{ + "triggers": map[string]interface{}{ + "query": "b=__TEMPORAL_WORKER_BUILD_ID__", + }, + }, + } + SubstituteTemporalTokens(obj, tokens) + spec := obj["spec"].(map[string]interface{}) + triggers := spec["triggers"].(map[string]interface{}) + assert.Equal(t, "b=buildX", triggers["query"]) + }) + + t.Run("walks slices of maps", func(t *testing.T) { + obj := map[string]interface{}{ + "triggers": []interface{}{ + map[string]interface{}{"query": "b=__TEMPORAL_WORKER_BUILD_ID__"}, + map[string]interface{}{"query": "n=__TEMPORAL_NAMESPACE__"}, + }, + } + SubstituteTemporalTokens(obj, tokens) + triggers := obj["triggers"].([]interface{}) + t0 := triggers[0].(map[string]interface{}) + t1 := triggers[1].(map[string]interface{}) + assert.Equal(t, "b=buildX", t0["query"]) + assert.Equal(t, "n=tns", t1["query"]) + }) + + t.Run("walks slices of strings", func(t *testing.T) { + obj := map[string]interface{}{ + "labels": []interface{}{"__TEMPORAL_WORKER_BUILD_ID__", "literal"}, + } + SubstituteTemporalTokens(obj, tokens) + labels := obj["labels"].([]interface{}) + assert.Equal(t, "buildX", labels[0]) + assert.Equal(t, "literal", labels[1]) + }) + + t.Run("non-string leaves are left alone", func(t *testing.T) { + obj := map[string]interface{}{ + "minReplicaCount": float64(1), + "pollingInterval": float64(15), + "enabled": true, + } + SubstituteTemporalTokens(obj, tokens) + assert.Equal(t, float64(1), obj["minReplicaCount"]) + assert.Equal(t, float64(15), obj["pollingInterval"]) + assert.Equal(t, true, obj["enabled"]) + }) + + t.Run("repeated token occurrences all substituted", func(t *testing.T) { + obj := map[string]interface{}{ + "q": "__TEMPORAL_WORKER_BUILD_ID__-__TEMPORAL_WORKER_BUILD_ID__", + } + SubstituteTemporalTokens(obj, tokens) + assert.Equal(t, "buildX-buildX", obj["q"]) + }) +} diff --git a/internal/k8s/workerresourcetemplates.go b/internal/k8s/workerresourcetemplates.go index a4ef4615..63c999f6 100644 --- a/internal/k8s/workerresourcetemplates.go +++ b/internal/k8s/workerresourcetemplates.go @@ -72,8 +72,9 @@ func ComputeWorkerResourceTemplateName(twdName, wrtName, buildID string) string // // Processing order: // 1. Unmarshal spec.template into an Unstructured -// 2. Auto-inject scaleTargetRef and matchLabels (Layer 1) -// 3. Set metadata (name, namespace, labels, owner reference) +// 2. Substitute __TEMPORAL_*__ tokens in every string leaf +// 3. Auto-inject scaleTargetRef and matchLabels (Layer 1) +// 4. Set metadata (name, namespace, labels, owner reference) func RenderWorkerResourceTemplate( wrt *temporaliov1alpha1.WorkerResourceTemplate, deployment *appsv1.Deployment, @@ -87,6 +88,12 @@ func RenderWorkerResourceTemplate( } twdName := wrt.Spec.TemporalWorkerDeploymentRef.Name + + // Token substitution runs first so structured injection downstream never + // sees unresolved tokens. Tokens are string-presence-driven — templates + // that don't contain them are untouched. + SubstituteTemporalTokens(obj.Object, BuildTemporalTokens(wrt.Namespace, twdName, buildID, temporalNamespace)) + selectorLabels := ComputeSelectorLabels(twdName, buildID) // Labels the controller appends to every metrics[*].external.metric.selector.matchLabels diff --git a/internal/k8s/workerresourcetemplates_test.go b/internal/k8s/workerresourcetemplates_test.go index 8fcaef48..9445d133 100644 --- a/internal/k8s/workerresourcetemplates_test.go +++ b/internal/k8s/workerresourcetemplates_test.go @@ -342,3 +342,128 @@ func TestRenderWorkerResourceTemplate(t *testing.T) { require.True(t, ok, "scaleTargetRef should have been auto-injected") assert.Equal(t, "my-worker-abc123", ref["name"]) } + +func TestRenderWorkerResourceTemplate_SubstitutesTokens(t *testing.T) { + template := map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, + "minReplicas": float64(1), + "maxReplicas": float64(10), + "description": "build=__TEMPORAL_WORKER_BUILD_ID__ ns=__TEMPORAL_NAMESPACE__ wd=__TEMPORAL_WORKER_DEPLOYMENT_NAME__", + }, + } + raw, err := json.Marshal(template) + require.NoError(t, err) + + wrt := &temporaliov1alpha1.WorkerResourceTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "myhpa", Namespace: "team-ns", UID: types.UID("uid")}, + Spec: temporaliov1alpha1.WorkerResourceTemplateSpec{ + TemporalWorkerDeploymentRef: temporaliov1alpha1.TemporalWorkerDeploymentReference{Name: "mywd"}, + Template: runtime.RawExtension{Raw: raw}, + }, + } + deploy := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mywd-abc", Namespace: "team-ns"}} + + got, err := RenderWorkerResourceTemplate(wrt, deploy, "build-xyz", "default.acct") + require.NoError(t, err) + + spec, ok := got.Object["spec"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "build=build-xyz ns=default.acct wd=team-ns_mywd", spec["description"]) +} + +// validScaledObjectTemplate returns a minimal ScaledObject template suitable for render tests. +func validScaledObjectTemplate() map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "keda.sh/v1alpha1", + "kind": "ScaledObject", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{}, + "minReplicaCount": float64(1), + "maxReplicaCount": float64(10), + "triggers": []interface{}{ + map[string]interface{}{ + "type": "prometheus", + "metadata": map[string]interface{}{ + "serverAddress": "http://prom:9090", + "threshold": "1", + "query": `sum(metric{deployment="__TEMPORAL_WORKER_DEPLOYMENT_NAME__",` + + `build="__TEMPORAL_WORKER_BUILD_ID__",ns="__TEMPORAL_NAMESPACE__"})`, + }, + }, + }, + }, + } +} + +func TestRenderWorkerResourceTemplate_ScaledObject_InjectsScaleTargetRef(t *testing.T) { + raw, err := json.Marshal(validScaledObjectTemplate()) + require.NoError(t, err) + wrt := &temporaliov1alpha1.WorkerResourceTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "myso", Namespace: "team-ns", UID: types.UID("uid")}, + Spec: temporaliov1alpha1.WorkerResourceTemplateSpec{ + TemporalWorkerDeploymentRef: temporaliov1alpha1.TemporalWorkerDeploymentReference{Name: "mywd"}, + Template: runtime.RawExtension{Raw: raw}, + }, + } + deploy := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mywd-abc"}} + got, err := RenderWorkerResourceTemplate(wrt, deploy, "build-1", "ns1") + require.NoError(t, err) + + spec := got.Object["spec"].(map[string]interface{}) + ref := spec["scaleTargetRef"].(map[string]interface{}) + assert.Equal(t, "apps/v1", ref["apiVersion"]) + assert.Equal(t, "Deployment", ref["kind"]) + assert.Equal(t, "mywd-abc", ref["name"]) +} + +func TestRenderWorkerResourceTemplate_ScaledObject_PerVersionDeterminism(t *testing.T) { + raw, err := json.Marshal(validScaledObjectTemplate()) + require.NoError(t, err) + wrt := &temporaliov1alpha1.WorkerResourceTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "myso", Namespace: "team-ns", UID: types.UID("uid")}, + Spec: temporaliov1alpha1.WorkerResourceTemplateSpec{ + TemporalWorkerDeploymentRef: temporaliov1alpha1.TemporalWorkerDeploymentReference{Name: "mywd"}, + Template: runtime.RawExtension{Raw: raw}, + }, + } + deploy1 := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mywd-1"}} + deploy2 := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mywd-2"}} + + got1, err := RenderWorkerResourceTemplate(wrt, deploy1, "build-1", "ns1") + require.NoError(t, err) + got2, err := RenderWorkerResourceTemplate(wrt, deploy2, "build-2", "ns1") + require.NoError(t, err) + + q1 := got1.Object["spec"].(map[string]interface{})["triggers"].([]interface{})[0]. + (map[string]interface{})["metadata"].(map[string]interface{})["query"] + q2 := got2.Object["spec"].(map[string]interface{})["triggers"].([]interface{})[0]. + (map[string]interface{})["metadata"].(map[string]interface{})["query"] + + assert.Contains(t, q1, `build="build-1"`) + assert.Contains(t, q2, `build="build-2"`) + assert.NotEqual(t, got1.GetName(), got2.GetName(), "per-version names must differ") + assert.NotEqual(t, ComputeRenderedObjectHash(got1), ComputeRenderedObjectHash(got2), "per-version hashes must differ") +} + +func TestRenderWorkerResourceTemplate_ScaledObject_HashStability(t *testing.T) { + raw, err := json.Marshal(validScaledObjectTemplate()) + require.NoError(t, err) + wrt := &temporaliov1alpha1.WorkerResourceTemplate{ + ObjectMeta: metav1.ObjectMeta{Name: "myso", Namespace: "team-ns", UID: types.UID("uid")}, + Spec: temporaliov1alpha1.WorkerResourceTemplateSpec{ + TemporalWorkerDeploymentRef: temporaliov1alpha1.TemporalWorkerDeploymentReference{Name: "mywd"}, + Template: runtime.RawExtension{Raw: raw}, + }, + } + deploy := &appsv1.Deployment{ObjectMeta: metav1.ObjectMeta{Name: "mywd-1"}} + + a, err := RenderWorkerResourceTemplate(wrt, deploy, "build-1", "ns1") + require.NoError(t, err) + b, err := RenderWorkerResourceTemplate(wrt, deploy, "build-1", "ns1") + require.NoError(t, err) + + assert.Equal(t, ComputeRenderedObjectHash(a), ComputeRenderedObjectHash(b), "re-rendering same inputs must produce same hash") +}