From bddf9dcea05f9133baa65db376230f7827a8be3b Mon Sep 17 00:00:00 2001 From: Minyi Zhu Date: Tue, 17 Mar 2026 13:04:01 +0100 Subject: [PATCH 1/4] Add Fleet Automation experiment lifecycle for remote agent management Implement the full experiment lifecycle for Fleet Automation, enabling the operator to receive experiment signals (start/stop/promote) via Remote Config and manage ControllerRevision-based spec snapshots for rollback. ## CRD changes Add to DatadogAgentStatus: - CurrentRevision / PreviousRevision: track ControllerRevision pointers - Experiment: ExperimentStatus with phase, startedAt, baselineRevision, id, and expectedSpecHash fields - ExperimentPhase enum: running, rollback, promoted, aborted, timeout ## Reconciler side (experiment package) New package: internal/controller/datadogagent/experiment/ - ControllerRevision creation with hash-based naming ({dda}-{md5[:10]}) - Revision pointer tracking on every spec change - Experiment phase handling hooked into internalReconcileV2 after defaults - Timeout detection: auto-rollback when now - startedAt >= 30min - Conflict detection via ExpectedSpecHash: RC computes hash of defaulted FA config; reconciler verifies spec matches on first reconcile, catching user edits between RC patch and first reconcile. Hash survives status-before-spec race. - Spec restoration from ControllerRevision baseline with direct status persistence (re-fetch after spec update to avoid resourceVersion conflict) - Two-reconcile terminal phase pattern: rollback/timeout phase is persisted and observable before being cleared on the next reconcile - GC of old ControllerRevisions (keep current + previous + baseline) - Status fields preserved in generateNewStatusFromDDA ## RC callback side (remoteconfig package) New file: pkg/remoteconfig/experiment.go - ExperimentSignal type for FA payloads (action + experiment_id + config) - parseExperimentSignal: detects experiment signals vs regular agent configs - Signal routing in agentConfigUpdateCallback: experiment signals handled separately, regular configs continue through normal path. Mixed batches handled correctly with per-update ACK/error reporting. - handleStartExperiment: guards (no config, no baseline, active experiment in any phase). Allows same-ID retry if ExpectedSpecHash still set (partial failure recovery). Sets phase=running, locks baselineRevision, computes expectedSpecHash with defaults applied, patches spec. - handleStopExperiment: validates phase=running and signal ID matches running experiment ID. Sets phase=rollback. - handlePromoteExperiment: same validation. Sets phase=promoted. ## RBAC Updated controllerrevisions verbs from list;watch to get;list;watch;create;update;delete across all manifests: - config/rbac/role.yaml (via kubebuilder marker) - bundle/manifests/datadog-operator.clusterserviceversion.yaml - marketplaces/addon_manifest.yaml - marketplaces/charts/google-marketplace/schema.yaml ## Tests (68 unit tests) Co-Authored-By: Claude Opus 4.6 (1M context) --- api/datadoghq/v2alpha1/datadogagent_types.go | 51 + .../v2alpha1/zz_generated.deepcopy.go | 24 + .../v2alpha1/zz_generated.openapi.go | 72 +- ...atadog-operator.clusterserviceversion.yaml | 4 + .../bases/v1/datadoghq.com_datadogagents.yaml | 40 + .../datadoghq.com_datadogagents_v2alpha1.json | 43 + config/rbac/role.yaml | 4 + .../datadogagent/controller_reconcile_v2.go | 15 +- .../controller_reconcile_v2_common.go | 8 + .../controller_reconcile_v2_helpers.go | 8 + .../experiment/experiment_test.go | 1099 +++++++++++++++++ .../datadogagent/experiment/lifecycle.go | 223 ++++ .../datadogagent/experiment/revision.go | 177 +++ .../datadogagent/experiment/types.go | 16 + .../datadogagent/testutils/client_utils.go | 2 + .../controller/datadogagent_controller.go | 2 +- marketplaces/addon_manifest.yaml | 4 + .../charts/google-marketplace/schema.yaml | 4 + pkg/remoteconfig/experiment.go | 261 ++++ pkg/remoteconfig/experiment_test.go | 551 +++++++++ pkg/remoteconfig/updater.go | 41 +- 21 files changed, 2642 insertions(+), 7 deletions(-) create mode 100644 internal/controller/datadogagent/experiment/experiment_test.go create mode 100644 internal/controller/datadogagent/experiment/lifecycle.go create mode 100644 internal/controller/datadogagent/experiment/revision.go create mode 100644 internal/controller/datadogagent/experiment/types.go create mode 100644 pkg/remoteconfig/experiment.go create mode 100644 pkg/remoteconfig/experiment_test.go diff --git a/api/datadoghq/v2alpha1/datadogagent_types.go b/api/datadoghq/v2alpha1/datadogagent_types.go index 97cc8c819..5c7f64bc0 100644 --- a/api/datadoghq/v2alpha1/datadogagent_types.go +++ b/api/datadoghq/v2alpha1/datadogagent_types.go @@ -2319,6 +2319,48 @@ type RemoteConfigConfiguration struct { Features *DatadogFeatures `json:"features,omitempty"` } +// ExperimentPhase represents the current phase of a Fleet Automation experiment. +// +kubebuilder:validation:Enum=running;rollback;promoted;aborted;timeout +type ExperimentPhase string + +const ( + // ExperimentPhaseRunning indicates a startExperiment signal was received and the experiment is active. + ExperimentPhaseRunning ExperimentPhase = "running" + // ExperimentPhaseRollback indicates a stopExperiment signal was received and rollback is in progress. + ExperimentPhaseRollback ExperimentPhase = "rollback" + // ExperimentPhasePromoted indicates a promoteExperiment signal was received and the experiment is permanent. + ExperimentPhasePromoted ExperimentPhase = "promoted" + // ExperimentPhaseAborted indicates the experiment was aborted due to an external DDA spec change. + ExperimentPhaseAborted ExperimentPhase = "aborted" + // ExperimentPhaseTimeout indicates the operator auto-rolled back after the timeout elapsed. + ExperimentPhaseTimeout ExperimentPhase = "timeout" +) + +// ExperimentStatus tracks the state of a Fleet Automation experiment. +// +k8s:openapi-gen=true +type ExperimentStatus struct { + // Phase is the current state of the experiment. + // +optional + Phase ExperimentPhase `json:"phase,omitempty"` + // StartedAt is the timestamp when the experiment began. + // Used by the reconciler to compute elapsed time for auto-rollback. + // +optional + StartedAt *metav1.Time `json:"startedAt,omitempty"` + // BaselineRevision is the name of the ControllerRevision capturing the pre-experiment spec. + // Locked at startExperiment and never shifted, even if subsequent edits occur. + // +optional + BaselineRevision string `json:"baselineRevision,omitempty"` + // ID is the unique experiment ID sent by Fleet Automation. + // +optional + ID string `json:"id,omitempty"` + // ExpectedSpecHash is the truncated MD5 hash of the spec that FA sent in + // startExperiment. The reconciler uses this on the first reconcile to verify + // the current spec matches what FA intended, detecting user edits that land + // between the RC spec patch and the first reconcile. + // +optional + ExpectedSpecHash string `json:"expectedSpecHash,omitempty"` +} + // DatadogAgentStatus defines the observed state of DatadogAgent. // +k8s:openapi-gen=true type DatadogAgentStatus struct { @@ -2346,6 +2388,15 @@ type DatadogAgentStatus struct { // RemoteConfigConfiguration stores the configuration received from RemoteConfig. // +optional RemoteConfigConfiguration *RemoteConfigConfiguration `json:"remoteConfigConfiguration,omitempty"` + // CurrentRevision is the name of the ControllerRevision for the spec currently applied. + // +optional + CurrentRevision string `json:"currentRevision,omitempty"` + // PreviousRevision is the name of the ControllerRevision for the spec just before the current one. + // +optional + PreviousRevision string `json:"previousRevision,omitempty"` + // Experiment tracks the state of an active or recent Fleet Automation experiment. + // +optional + Experiment *ExperimentStatus `json:"experiment,omitempty"` } // DatadogAgent defines Agent configuration, see reference https://github.com/DataDog/datadog-operator/blob/main/docs/configuration.v2alpha1.md diff --git a/api/datadoghq/v2alpha1/zz_generated.deepcopy.go b/api/datadoghq/v2alpha1/zz_generated.deepcopy.go index 9a2b003ba..c8abaf328 100644 --- a/api/datadoghq/v2alpha1/zz_generated.deepcopy.go +++ b/api/datadoghq/v2alpha1/zz_generated.deepcopy.go @@ -1326,6 +1326,11 @@ func (in *DatadogAgentStatus) DeepCopyInto(out *DatadogAgentStatus) { *out = new(RemoteConfigConfiguration) (*in).DeepCopyInto(*out) } + if in.Experiment != nil { + in, out := &in.Experiment, &out.Experiment + *out = new(ExperimentStatus) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatadogAgentStatus. @@ -1729,6 +1734,25 @@ func (in *EventTypes) DeepCopy() *EventTypes { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExperimentStatus) DeepCopyInto(out *ExperimentStatus) { + *out = *in + if in.StartedAt != nil { + in, out := &in.StartedAt, &out.StartedAt + *out = (*in).DeepCopy() + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExperimentStatus. +func (in *ExperimentStatus) DeepCopy() *ExperimentStatus { + if in == nil { + return nil + } + out := new(ExperimentStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExternalMetricsServerFeatureConfig) DeepCopyInto(out *ExternalMetricsServerFeatureConfig) { *out = *in diff --git a/api/datadoghq/v2alpha1/zz_generated.openapi.go b/api/datadoghq/v2alpha1/zz_generated.openapi.go index bdba1d97c..306021ee6 100644 --- a/api/datadoghq/v2alpha1/zz_generated.openapi.go +++ b/api/datadoghq/v2alpha1/zz_generated.openapi.go @@ -34,6 +34,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DogstatsdFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_DogstatsdFeatureConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ErrorTrackingStandalone": schema_datadog_operator_api_datadoghq_v2alpha1_ErrorTrackingStandalone(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.EventCollectionFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_EventCollectionFeatureConfig(ref), + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus": schema_datadog_operator_api_datadoghq_v2alpha1_ExperimentStatus(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.FIPSConfig": schema_datadog_operator_api_datadoghq_v2alpha1_FIPSConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.HelmCheckFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_HelmCheckFeatureConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.KubeStateMetricsCoreFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_KubeStateMetricsCoreFeatureConfig(ref), @@ -645,11 +646,31 @@ func schema_datadog_operator_api_datadoghq_v2alpha1_DatadogAgentStatus(ref commo Ref: ref("github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration"), }, }, + "currentRevision": { + SchemaProps: spec.SchemaProps{ + Description: "CurrentRevision is the name of the ControllerRevision for the spec currently applied.", + Type: []string{"string"}, + Format: "", + }, + }, + "previousRevision": { + SchemaProps: spec.SchemaProps{ + Description: "PreviousRevision is the name of the ControllerRevision for the spec just before the current one.", + Type: []string{"string"}, + Format: "", + }, + }, + "experiment": { + SchemaProps: spec.SchemaProps{ + Description: "Experiment tracks the state of an active or recent Fleet Automation experiment.", + Ref: ref("github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DaemonSetStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DeploymentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration", "k8s.io/apimachinery/pkg/apis/meta/v1.Condition"}, + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DaemonSetStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DeploymentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration", "k8s.io/apimachinery/pkg/apis/meta/v1.Condition"}, } } @@ -1114,6 +1135,55 @@ func schema_datadog_operator_api_datadoghq_v2alpha1_EventCollectionFeatureConfig } } +func schema_datadog_operator_api_datadoghq_v2alpha1_ExperimentStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ExperimentStatus tracks the state of a Fleet Automation experiment.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "phase": { + SchemaProps: spec.SchemaProps{ + Description: "Phase is the current state of the experiment.", + Type: []string{"string"}, + Format: "", + }, + }, + "startedAt": { + SchemaProps: spec.SchemaProps{ + Description: "StartedAt is the timestamp when the experiment began. Used by the reconciler to compute elapsed time for auto-rollback.", + Ref: ref("k8s.io/apimachinery/pkg/apis/meta/v1.Time"), + }, + }, + "baselineRevision": { + SchemaProps: spec.SchemaProps{ + Description: "BaselineRevision is the name of the ControllerRevision capturing the pre-experiment spec. Locked at startExperiment and never shifted, even if subsequent edits occur.", + Type: []string{"string"}, + Format: "", + }, + }, + "id": { + SchemaProps: spec.SchemaProps{ + Description: "ID is the unique experiment ID sent by Fleet Automation.", + Type: []string{"string"}, + Format: "", + }, + }, + "expectedSpecHash": { + SchemaProps: spec.SchemaProps{ + Description: "ExpectedSpecHash is the truncated MD5 hash of the spec that FA sent in startExperiment. The reconciler uses this on the first reconcile to verify the current spec matches what FA intended, detecting user edits that land between the RC spec patch and the first reconcile.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + Dependencies: []string{ + "k8s.io/apimachinery/pkg/apis/meta/v1.Time"}, + } +} + func schema_datadog_operator_api_datadoghq_v2alpha1_FIPSConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/bundle/manifests/datadog-operator.clusterserviceversion.yaml b/bundle/manifests/datadog-operator.clusterserviceversion.yaml index 925611dbd..75f6a3498 100644 --- a/bundle/manifests/datadog-operator.clusterserviceversion.yaml +++ b/bundle/manifests/datadog-operator.clusterserviceversion.yaml @@ -381,7 +381,11 @@ spec: resources: - controllerrevisions verbs: + - create + - delete + - get - list + - update - watch - apiGroups: - apps diff --git a/config/crd/bases/v1/datadoghq.com_datadogagents.yaml b/config/crd/bases/v1/datadoghq.com_datadogagents.yaml index d1c514432..cee4972e0 100644 --- a/config/crd/bases/v1/datadoghq.com_datadogagents.yaml +++ b/config/crd/bases/v1/datadoghq.com_datadogagents.yaml @@ -8446,6 +8446,43 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + currentRevision: + description: CurrentRevision is the name of the ControllerRevision for the spec currently applied. + type: string + experiment: + description: Experiment tracks the state of an active or recent Fleet Automation experiment. + properties: + baselineRevision: + description: |- + BaselineRevision is the name of the ControllerRevision capturing the pre-experiment spec. + Locked at startExperiment and never shifted, even if subsequent edits occur. + type: string + expectedSpecHash: + description: |- + ExpectedSpecHash is the truncated MD5 hash of the spec that FA sent in + startExperiment. The reconciler uses this on the first reconcile to verify + the current spec matches what FA intended, detecting user edits that land + between the RC spec patch and the first reconcile. + type: string + id: + description: ID is the unique experiment ID sent by Fleet Automation. + type: string + phase: + description: Phase is the current state of the experiment. + enum: + - running + - rollback + - promoted + - aborted + - timeout + type: string + startedAt: + description: |- + StartedAt is the timestamp when the experiment began. + Used by the reconciler to compute elapsed time for auto-rollback. + format: date-time + type: string + type: object otelAgentGateway: description: The actual state of the OTel Agent Gateway as a deployment. properties: @@ -8494,6 +8531,9 @@ spec: format: int32 type: integer type: object + previousRevision: + description: PreviousRevision is the name of the ControllerRevision for the spec just before the current one. + type: string remoteConfigConfiguration: description: RemoteConfigConfiguration stores the configuration received from RemoteConfig. properties: diff --git a/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json b/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json index 20d60a93e..5be992926 100644 --- a/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json +++ b/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json @@ -8150,6 +8150,45 @@ ], "x-kubernetes-list-type": "map" }, + "currentRevision": { + "description": "CurrentRevision is the name of the ControllerRevision for the spec currently applied.", + "type": "string" + }, + "experiment": { + "additionalProperties": false, + "description": "Experiment tracks the state of an active or recent Fleet Automation experiment.", + "properties": { + "baselineRevision": { + "description": "BaselineRevision is the name of the ControllerRevision capturing the pre-experiment spec.\nLocked at startExperiment and never shifted, even if subsequent edits occur.", + "type": "string" + }, + "expectedSpecHash": { + "description": "ExpectedSpecHash is the truncated MD5 hash of the spec that FA sent in\nstartExperiment. The reconciler uses this on the first reconcile to verify\nthe current spec matches what FA intended, detecting user edits that land\nbetween the RC spec patch and the first reconcile.", + "type": "string" + }, + "id": { + "description": "ID is the unique experiment ID sent by Fleet Automation.", + "type": "string" + }, + "phase": { + "description": "Phase is the current state of the experiment.", + "enum": [ + "running", + "rollback", + "promoted", + "aborted", + "timeout" + ], + "type": "string" + }, + "startedAt": { + "description": "StartedAt is the timestamp when the experiment began.\nUsed by the reconciler to compute elapsed time for auto-rollback.", + "format": "date-time", + "type": "string" + } + }, + "type": "object" + }, "otelAgentGateway": { "additionalProperties": false, "description": "The actual state of the OTel Agent Gateway as a deployment.", @@ -8207,6 +8246,10 @@ }, "type": "object" }, + "previousRevision": { + "description": "PreviousRevision is the name of the ControllerRevision for the spec just before the current one.", + "type": "string" + }, "remoteConfigConfiguration": { "additionalProperties": false, "description": "RemoteConfigConfiguration stores the configuration received from RemoteConfig.", diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index d0e9e790b..c69067261 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -103,7 +103,11 @@ rules: resources: - controllerrevisions verbs: + - create + - delete + - get - list + - update - watch - apiGroups: - apps diff --git a/internal/controller/datadogagent/controller_reconcile_v2.go b/internal/controller/datadogagent/controller_reconcile_v2.go index 06f7c339b..7a1cb1bd4 100644 --- a/internal/controller/datadogagent/controller_reconcile_v2.go +++ b/internal/controller/datadogagent/controller_reconcile_v2.go @@ -21,6 +21,7 @@ import ( "github.com/DataDog/datadog-operator/internal/controller/datadogagent/common" "github.com/DataDog/datadog-operator/internal/controller/datadogagent/component" "github.com/DataDog/datadog-operator/internal/controller/datadogagent/defaults" + "github.com/DataDog/datadog-operator/internal/controller/datadogagent/experiment" "github.com/DataDog/datadog-operator/internal/controller/datadogagent/feature" "github.com/DataDog/datadog-operator/pkg/agentprofile" "github.com/DataDog/datadog-operator/pkg/condition" @@ -48,7 +49,19 @@ func (r *Reconciler) internalReconcileV2(ctx context.Context, instance *datadogh instanceCopy := instance.DeepCopy() defaults.DefaultDatadogAgentSpec(&instanceCopy.Spec) - // 4. Delegate to the main reconcile function. + // 4. Handle experiment lifecycle (ControllerRevision management, timeout, conflict detection). + // When HandleExperimentLifecycle returns shouldReturn=true (rollback/timeout), + // it has already persisted the status directly (re-fetching after spec restore + // to avoid resourceVersion conflicts). The caller just returns. + if shouldReturn, res, err := experiment.HandleExperimentLifecycle( + ctx, r.client, instanceCopy, r.scheme, time.Now(), experiment.DefaultExperimentTimeout, + ); err != nil { + return res, err + } else if shouldReturn { + return res, nil + } + + // 5. Delegate to the main reconcile function. if r.options.DatadogAgentInternalEnabled { return r.reconcileInstanceV3(ctx, reqLogger, instanceCopy) } diff --git a/internal/controller/datadogagent/controller_reconcile_v2_common.go b/internal/controller/datadogagent/controller_reconcile_v2_common.go index ef70322ea..b59a33637 100644 --- a/internal/controller/datadogagent/controller_reconcile_v2_common.go +++ b/internal/controller/datadogagent/controller_reconcile_v2_common.go @@ -826,5 +826,13 @@ func IsEqualStatus(current *v2alpha1.DatadogAgentStatus, newStatus *v2alpha1.Dat return false } + if current.CurrentRevision != newStatus.CurrentRevision || + current.PreviousRevision != newStatus.PreviousRevision { + return false + } + if !apiequality.Semantic.DeepEqual(current.Experiment, newStatus.Experiment) { + return false + } + return condition.IsEqualConditions(current.Conditions, newStatus.Conditions) } diff --git a/internal/controller/datadogagent/controller_reconcile_v2_helpers.go b/internal/controller/datadogagent/controller_reconcile_v2_helpers.go index cdbf084c3..fb296f0c4 100644 --- a/internal/controller/datadogagent/controller_reconcile_v2_helpers.go +++ b/internal/controller/datadogagent/controller_reconcile_v2_helpers.go @@ -248,6 +248,8 @@ func (r *Reconciler) applyAndCleanupDependencies(ctx context.Context, logger log // generateNewStatusFromDDA generates a new status from a DDA status. // If an existing DCA token is present, it is copied to the new status. +// Experiment lifecycle fields (revision pointers, experiment state) are preserved +// so they are not overwritten by the reconciler's status update. func generateNewStatusFromDDA(ddaStatus *datadoghqv2alpha1.DatadogAgentStatus) *datadoghqv2alpha1.DatadogAgentStatus { status := &datadoghqv2alpha1.DatadogAgentStatus{} if ddaStatus != nil { @@ -259,6 +261,12 @@ func generateNewStatusFromDDA(ddaStatus *datadoghqv2alpha1.DatadogAgentStatus) * if ddaStatus.RemoteConfigConfiguration != nil { status.RemoteConfigConfiguration = ddaStatus.RemoteConfigConfiguration } + // Preserve experiment lifecycle fields + status.CurrentRevision = ddaStatus.CurrentRevision + status.PreviousRevision = ddaStatus.PreviousRevision + if ddaStatus.Experiment != nil { + status.Experiment = ddaStatus.Experiment.DeepCopy() + } } return status } diff --git a/internal/controller/datadogagent/experiment/experiment_test.go b/internal/controller/datadogagent/experiment/experiment_test.go new file mode 100644 index 000000000..0344e2176 --- /dev/null +++ b/internal/controller/datadogagent/experiment/experiment_test.go @@ -0,0 +1,1099 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package experiment + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + apiutils "github.com/DataDog/datadog-operator/api/utils" +) + +const ( + testNamespace = "datadog" + testDDAName = "datadog-agent" +) + +// --- Test helpers --- + +func testScheme() *runtime.Scheme { + s := runtime.NewScheme() + _ = v2alpha1.AddToScheme(s) + _ = appsv1.AddToScheme(s) + return s +} + +func newTestDDA(spec v2alpha1.DatadogAgentSpec) *v2alpha1.DatadogAgent { + return &v2alpha1.DatadogAgent{ + TypeMeta: metav1.TypeMeta{ + Kind: "DatadogAgent", + APIVersion: "datadoghq.com/v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: testDDAName, + Namespace: testNamespace, + UID: types.UID("test-uid-123"), + }, + Spec: spec, + } +} + +func newTestDDAWithStatus(spec v2alpha1.DatadogAgentSpec, status v2alpha1.DatadogAgentStatus) *v2alpha1.DatadogAgent { + dda := newTestDDA(spec) + dda.Status = status + return dda +} + +func specWithAPM(enabled bool) v2alpha1.DatadogAgentSpec { + return v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(enabled), + }, + }, + } +} + +func specWithNPM(enabled bool) v2alpha1.DatadogAgentSpec { + return v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + NPM: &v2alpha1.NPMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(enabled), + }, + }, + } +} + +func buildFakeClient(s *runtime.Scheme, objs ...client.Object) client.Client { + return fake.NewClientBuilder(). + WithScheme(s). + WithObjects(objs...). + WithStatusSubresource(&v2alpha1.DatadogAgent{}). + Build() +} + +func makeControllerRevision(name, namespace string, owner *v2alpha1.DatadogAgent, spec v2alpha1.DatadogAgentSpec) *appsv1.ControllerRevision { + data, _ := serializeSpec(&spec) + cr := &appsv1.ControllerRevision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: runtime.RawExtension{Raw: data}, + } + if owner != nil { + cr.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "datadoghq.com/v2alpha1", + Kind: "DatadogAgent", + Name: owner.Name, + UID: owner.UID, + Controller: apiutils.NewBoolPointer(true), + }, + } + } + return cr +} + +func timePtr(t time.Time) *metav1.Time { + mt := metav1.NewTime(t) + return &mt +} + +// ============================================================================ +// ComputeSpecHash tests +// ============================================================================ + +func TestComputeSpecHash_Deterministic(t *testing.T) { + spec := specWithAPM(true) + hash1, err1 := ComputeSpecHash(&spec) + hash2, err2 := ComputeSpecHash(&spec) + + require.NoError(t, err1) + require.NoError(t, err2) + assert.Equal(t, hash1, hash2, "same spec should produce same hash") + assert.Len(t, hash1, RevisionHashLength, "hash should be truncated to RevisionHashLength") +} + +func TestComputeSpecHash_DifferentSpecs(t *testing.T) { + spec1 := specWithAPM(true) + spec2 := specWithAPM(false) + + hash1, err1 := ComputeSpecHash(&spec1) + hash2, err2 := ComputeSpecHash(&spec2) + + require.NoError(t, err1) + require.NoError(t, err2) + assert.NotEqual(t, hash1, hash2, "different specs should produce different hashes") +} + +// ============================================================================ +// RevisionName tests +// ============================================================================ + +func TestRevisionName_Format(t *testing.T) { + name := RevisionName("datadog-agent", "abcdef1234") + assert.Equal(t, "datadog-agent-abcdef1234", name) +} + +// ============================================================================ +// CreateControllerRevision tests +// ============================================================================ + +func TestCreateControllerRevision_New(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + c := buildFakeClient(s, dda) + + name, created, err := CreateControllerRevision(context.TODO(), c, dda, s) + + require.NoError(t, err) + assert.True(t, created, "should have created a new revision") + assert.NotEmpty(t, name) + + // Verify the ControllerRevision exists + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: testNamespace}, cr) + require.NoError(t, err) + assert.Equal(t, name, cr.Name) + assert.NotEmpty(t, cr.Data.Raw, "revision should contain serialized spec data") +} + +func TestCreateControllerRevision_AlreadyExists(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + + // Pre-create the ControllerRevision + hash, _ := ComputeSpecHash(&dda.Spec) + existingName := RevisionName(testDDAName, hash) + existingCR := makeControllerRevision(existingName, testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, existingCR) + + name, created, err := CreateControllerRevision(context.TODO(), c, dda, s) + + require.NoError(t, err) + assert.False(t, created, "should not create a duplicate revision") + assert.Equal(t, existingName, name) +} + +func TestCreateControllerRevision_OwnerRef(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + c := buildFakeClient(s, dda) + + name, _, err := CreateControllerRevision(context.TODO(), c, dda, s) + require.NoError(t, err) + + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: testNamespace}, cr) + require.NoError(t, err) + + require.Len(t, cr.OwnerReferences, 1) + ownerRef := cr.OwnerReferences[0] + assert.Equal(t, dda.Name, ownerRef.Name) + assert.Equal(t, dda.UID, ownerRef.UID) + assert.Equal(t, "DatadogAgent", ownerRef.Kind) + assert.NotNil(t, ownerRef.Controller) + assert.True(t, *ownerRef.Controller) +} + +// ============================================================================ +// GetControllerRevision tests +// ============================================================================ + +func TestGetControllerRevision_Exists(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + existingCR := makeControllerRevision("test-revision", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, existingCR) + + cr, err := GetControllerRevision(context.TODO(), c, testNamespace, "test-revision") + + require.NoError(t, err) + assert.Equal(t, "test-revision", cr.Name) +} + +func TestGetControllerRevision_NotFound(t *testing.T) { + s := testScheme() + c := buildFakeClient(s) + + _, err := GetControllerRevision(context.TODO(), c, testNamespace, "nonexistent") + + require.Error(t, err) +} + +// ============================================================================ +// RestoreSpecFromRevision tests +// ============================================================================ + +func TestRestoreSpecFromRevision_Success(t *testing.T) { + s := testScheme() + originalSpec := specWithAPM(true) + modifiedSpec := specWithNPM(true) + + dda := newTestDDA(modifiedSpec) // DDA currently has NPM + baselineRevision := makeControllerRevision("baseline", testNamespace, dda, originalSpec) // Baseline has APM + c := buildFakeClient(s, dda, baselineRevision) + + err := RestoreSpecFromRevision(context.TODO(), c, dda, "baseline") + require.NoError(t, err) + + // Verify the DDA spec was restored to the baseline + restored := &v2alpha1.DatadogAgent{} + err = c.Get(context.TODO(), types.NamespacedName{Name: testDDAName, Namespace: testNamespace}, restored) + require.NoError(t, err) + + assert.NotNil(t, restored.Spec.Features.APM, "APM should be restored from baseline") + assert.True(t, apiutils.BoolValue(restored.Spec.Features.APM.Enabled), "APM should be enabled") +} + +func TestRestoreSpecFromRevision_NotFound(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + c := buildFakeClient(s, dda) + + err := RestoreSpecFromRevision(context.TODO(), c, dda, "nonexistent-revision") + + require.Error(t, err) +} + +// ============================================================================ +// ListOwnedRevisions tests +// ============================================================================ + +func TestListOwnedRevisions_FiltersUnowned(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + + // Owned by this DDA + ownedCR := makeControllerRevision("owned-rev", testNamespace, dda, dda.Spec) + // Not owned by anyone + unownedCR := makeControllerRevision("unowned-rev", testNamespace, nil, dda.Spec) + c := buildFakeClient(s, dda, ownedCR, unownedCR) + + revisions, err := ListOwnedRevisions(context.TODO(), c, dda) + + require.NoError(t, err) + assert.Len(t, revisions, 1, "should only return owned revisions") + assert.Equal(t, "owned-rev", revisions[0].Name) +} + +// ============================================================================ +// GarbageCollectRevisions tests +// ============================================================================ + +func TestGC_KeepsCurrent(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + currentCR := makeControllerRevision("current-rev", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, currentCR) + + keep := map[string]bool{"current-rev": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) + + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "current-rev", Namespace: testNamespace}, cr) + require.NoError(t, err, "current revision should not be deleted") +} + +func TestGC_KeepsPrevious(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + previousCR := makeControllerRevision("previous-rev", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, previousCR) + + keep := map[string]bool{"previous-rev": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) + + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "previous-rev", Namespace: testNamespace}, cr) + require.NoError(t, err, "previous revision should not be deleted") +} + +func TestGC_KeepsBaseline(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, dda.Spec) + orphanCR := makeControllerRevision("orphan-rev", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, baselineCR, orphanCR) + + keep := map[string]bool{"baseline-rev": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) + + // Baseline should survive + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "baseline-rev", Namespace: testNamespace}, cr) + require.NoError(t, err, "baseline revision should not be deleted") + + // Orphan should be deleted + err = c.Get(context.TODO(), types.NamespacedName{Name: "orphan-rev", Namespace: testNamespace}, cr) + require.Error(t, err, "orphan revision should be deleted") +} + +func TestGC_DeletesOrphans(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + orphan1 := makeControllerRevision("orphan-1", testNamespace, dda, dda.Spec) + orphan2 := makeControllerRevision("orphan-2", testNamespace, dda, dda.Spec) + keepCR := makeControllerRevision("keep-me", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, orphan1, orphan2, keepCR) + + keep := map[string]bool{"keep-me": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) + + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "keep-me", Namespace: testNamespace}, cr) + require.NoError(t, err, "kept revision should exist") + + err = c.Get(context.TODO(), types.NamespacedName{Name: "orphan-1", Namespace: testNamespace}, cr) + require.Error(t, err, "orphan-1 should be deleted") + + err = c.Get(context.TODO(), types.NamespacedName{Name: "orphan-2", Namespace: testNamespace}, cr) + require.Error(t, err, "orphan-2 should be deleted") +} + +func TestGC_NoRevisions(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + c := buildFakeClient(s, dda) + + keep := map[string]bool{"anything": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) +} + +func TestGC_AllKept(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + cr1 := makeControllerRevision("rev-1", testNamespace, dda, dda.Spec) + cr2 := makeControllerRevision("rev-2", testNamespace, dda, dda.Spec) + c := buildFakeClient(s, dda, cr1, cr2) + + keep := map[string]bool{"rev-1": true, "rev-2": true} + err := GarbageCollectRevisions(context.TODO(), c, dda, keep) + + require.NoError(t, err) + + // Both should still exist + cr := &appsv1.ControllerRevision{} + err = c.Get(context.TODO(), types.NamespacedName{Name: "rev-1", Namespace: testNamespace}, cr) + require.NoError(t, err) + err = c.Get(context.TODO(), types.NamespacedName{Name: "rev-2", Namespace: testNamespace}, cr) + require.NoError(t, err) +} + +// ============================================================================ +// BuildKeepSet tests +// ============================================================================ + +func TestBuildKeepSet_NoExperiment(t *testing.T) { + status := &v2alpha1.DatadogAgentStatus{ + CurrentRevision: "current-rev", + PreviousRevision: "previous-rev", + } + + keep := BuildKeepSet(status) + + assert.True(t, keep["current-rev"]) + assert.True(t, keep["previous-rev"]) + assert.Len(t, keep, 2) +} + +func TestBuildKeepSet_WithExperiment(t *testing.T) { + status := &v2alpha1.DatadogAgentStatus{ + CurrentRevision: "current-rev", + PreviousRevision: "previous-rev", + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: "baseline-rev", + }, + } + + keep := BuildKeepSet(status) + + assert.True(t, keep["current-rev"]) + assert.True(t, keep["previous-rev"]) + assert.True(t, keep["baseline-rev"]) + assert.Len(t, keep, 3) +} + +func TestBuildKeepSet_EmptyStringsExcluded(t *testing.T) { + status := &v2alpha1.DatadogAgentStatus{ + CurrentRevision: "current-rev", + PreviousRevision: "", // empty + Experiment: &v2alpha1.ExperimentStatus{ + BaselineRevision: "", // empty + }, + } + + keep := BuildKeepSet(status) + + assert.True(t, keep["current-rev"]) + assert.False(t, keep[""]) + assert.Len(t, keep, 1) +} + +// ============================================================================ +// checkTimeout tests +// ============================================================================ + +func TestCheckTimeout_NotExpired(t *testing.T) { + now := time.Now() + exp := &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-10 * time.Minute)), + } + + result := CheckTimeout(exp, now, 30*time.Minute) + assert.False(t, result) +} + +func TestCheckTimeout_Expired(t *testing.T) { + now := time.Now() + exp := &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-31 * time.Minute)), + } + + result := CheckTimeout(exp, now, 30*time.Minute) + assert.True(t, result) +} + +func TestCheckTimeout_NilStartedAt(t *testing.T) { + exp := &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + } + + result := CheckTimeout(exp, time.Now(), 30*time.Minute) + assert.False(t, result) +} + +// ============================================================================ +// checkConflict tests +// ============================================================================ + +func TestCheckConflict_NoConflict(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + dda := newTestDDA(spec) + + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + cr := makeControllerRevision(revName, testNamespace, dda, spec) + + dda.Status.CurrentRevision = revName + c := buildFakeClient(s, dda, cr) + + conflict, err := CheckConflict(context.TODO(), c, dda) + require.NoError(t, err) + assert.False(t, conflict) +} + +func TestCheckConflict_Detected(t *testing.T) { + s := testScheme() + originalSpec := specWithAPM(true) + modifiedSpec := specWithNPM(true) + + // DDA has modified spec, but currentRevision points to original + dda := newTestDDA(modifiedSpec) + hash, _ := ComputeSpecHash(&originalSpec) + revName := RevisionName(testDDAName, hash) + cr := makeControllerRevision(revName, testNamespace, dda, originalSpec) + + dda.Status.CurrentRevision = revName + c := buildFakeClient(s, dda, cr) + + conflict, err := CheckConflict(context.TODO(), c, dda) + require.NoError(t, err) + assert.True(t, conflict) +} + +func TestCheckConflict_NoCurrentRevision(t *testing.T) { + s := testScheme() + dda := newTestDDA(specWithAPM(true)) + // No currentRevision set + c := buildFakeClient(s, dda) + + conflict, err := CheckConflict(context.TODO(), c, dda) + require.NoError(t, err) + assert.False(t, conflict, "no conflict when no currentRevision is set") +} + +// ============================================================================ +// HandleExperimentLifecycle tests — Phase transitions +// ============================================================================ + +func TestLifecycle_NoExperiment_CreatesRevision(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + dda := newTestDDA(spec) + c := buildFakeClient(s, dda) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn, "should not return early when no experiment") + assert.NotEmpty(t, dda.Status.CurrentRevision, "currentRevision should be set") +} + +func TestLifecycle_NoExperiment_SpecUnchanged(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + c := buildFakeClient(s, dda, cr) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn) + assert.Equal(t, revName, dda.Status.CurrentRevision, "revision should not change") +} + +func TestLifecycle_Running_SetsStartedAt(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + // StartedAt is nil — should be set by the lifecycle handler + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + c := buildFakeClient(s, dda, cr) + + now := time.Now() + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn) + assert.NotNil(t, dda.Status.Experiment.StartedAt, "startedAt should be set") +} + +func TestLifecycle_Running_NoConflictNoTimeout(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + now := time.Now() + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-5 * time.Minute)), // well within timeout + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, specWithNPM(true)) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn, "should continue reconciliation normally") + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, dda.Status.Experiment.Phase) +} + +func TestLifecycle_Rollback_RestoresBaseline(t *testing.T) { + s := testScheme() + experimentSpec := specWithNPM(true) + baselineSpec := specWithAPM(true) + + hash, _ := ComputeSpecHash(&experimentSpec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(experimentSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRollback, + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, experimentSpec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.True(t, shouldReturn, "should return early to trigger re-reconcile after spec restore") + + // Phase stays as rollback (persisted to API, observable) — cleared on next reconcile + require.NotNil(t, dda.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseRollback, dda.Status.Experiment.Phase) + assert.Nil(t, dda.Status.Experiment.StartedAt, "startedAt should be cleared") + assert.Empty(t, dda.Status.Experiment.BaselineRevision, "baselineRevision cleared to signal restore done") + + // Verify spec was restored + restored := &v2alpha1.DatadogAgent{} + err = c.Get(context.TODO(), types.NamespacedName{Name: testDDAName, Namespace: testNamespace}, restored) + require.NoError(t, err) + assert.NotNil(t, restored.Spec.Features.APM, "APM should be restored from baseline") +} + +// TestLifecycle_Rollback_SecondReconcile_ClearsExperiment verifies that after +// the rollback phase is persisted, the next reconcile clears the experiment. +func TestLifecycle_Rollback_SecondReconcile_ClearsExperiment(t *testing.T) { + s := testScheme() + restoredSpec := specWithAPM(true) + hash, _ := ComputeSpecHash(&restoredSpec) + revName := RevisionName(testDDAName, hash) + + // Simulate post-rollback state: spec is restored, phase=rollback persisted, + // baselineRevision cleared by handleRestore + dda := newTestDDAWithStatus(restoredSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRollback, + // BaselineRevision is empty — signals restore already happened + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, restoredSpec) + c := buildFakeClient(s, dda, cr) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn, "should not return early — just clearing state") + assert.Nil(t, dda.Status.Experiment, "experiment should be cleared on second reconcile") +} + +func TestLifecycle_Rollback_MissingBaseline(t *testing.T) { + s := testScheme() + spec := specWithNPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRollback, + BaselineRevision: "nonexistent-baseline", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + c := buildFakeClient(s, dda, cr) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.Error(t, err, "should error when baseline revision is missing") +} + +func TestLifecycle_Promoted_ClearsExperiment(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhasePromoted, + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, specWithNPM(true)) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn) + // Experiment should be cleared + assert.Nil(t, dda.Status.Experiment, "experiment should be cleared after promotion") +} + +func TestLifecycle_Aborted_NoAction(t *testing.T) { + s := testScheme() + spec := specWithNPM(true) // User's edit + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseAborted, + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, specWithAPM(true)) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.False(t, shouldReturn, "should not restore spec on abort — user's edit wins") + // Spec should remain NPM (user's edit), not APM (baseline) + assert.Equal(t, v2alpha1.ExperimentPhaseAborted, dda.Status.Experiment.Phase) +} + +// ============================================================================ +// Timeout lifecycle tests +// ============================================================================ + +func TestLifecycle_Timeout_RestoresBaseline(t *testing.T) { + s := testScheme() + experimentSpec := specWithNPM(true) + baselineSpec := specWithAPM(true) + + hash, _ := ComputeSpecHash(&experimentSpec) + revName := RevisionName(testDDAName, hash) + + now := time.Now() + dda := newTestDDAWithStatus(experimentSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-31 * time.Minute)), // past timeout + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, experimentSpec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + assert.True(t, shouldReturn, "should return early after timeout rollback") + + // Phase set to timeout (persisted to API, observable by FA) — cleared on next reconcile + require.NotNil(t, dda.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseTimeout, dda.Status.Experiment.Phase) + + // Verify spec was restored + restored := &v2alpha1.DatadogAgent{} + err = c.Get(context.TODO(), types.NamespacedName{Name: testDDAName, Namespace: testNamespace}, restored) + require.NoError(t, err) + assert.NotNil(t, restored.Spec.Features.APM, "APM should be restored from baseline") +} + +func TestLifecycle_Timeout_AfterRestart(t *testing.T) { + s := testScheme() + experimentSpec := specWithNPM(true) + baselineSpec := specWithAPM(true) + + hash, _ := ComputeSpecHash(&experimentSpec) + revName := RevisionName(testDDAName, hash) + + now := time.Now() + // Simulate: operator was down and now startedAt is way in the past + dda := newTestDDAWithStatus(experimentSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-2 * time.Hour)), // way past timeout + BaselineRevision: "baseline-rev", + }, + }) + + cr := makeControllerRevision(revName, testNamespace, dda, experimentSpec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, cr, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + assert.True(t, shouldReturn, "should immediately roll back after restart if timeout passed") + require.NotNil(t, dda.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseTimeout, dda.Status.Experiment.Phase) +} + +// ============================================================================ +// Conflict detection lifecycle tests +// ============================================================================ + +func TestLifecycle_Conflict_Aborts(t *testing.T) { + s := testScheme() + // DDA has been externally modified to NPM + modifiedSpec := specWithNPM(true) + // But currentRevision points to the experiment spec (APM) + experimentSpec := specWithAPM(true) + expHash, _ := ComputeSpecHash(&experimentSpec) + expRevName := RevisionName(testDDAName, expHash) + + now := time.Now() + dda := newTestDDAWithStatus(modifiedSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: expRevName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-5 * time.Minute)), + BaselineRevision: "baseline-rev", + }, + }) + + expCR := makeControllerRevision(expRevName, testNamespace, dda, experimentSpec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, specWithAPM(false)) + c := buildFakeClient(s, dda, expCR, baselineCR) + + shouldReturn, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + // On conflict, we abort but still need to create a revision for the new spec + assert.Equal(t, v2alpha1.ExperimentPhaseAborted, dda.Status.Experiment.Phase) + assert.Nil(t, dda.Status.Experiment.StartedAt, "startedAt should be cleared on abort") + // shouldReturn depends on implementation — conflict detection doesn't restore spec + _ = shouldReturn +} + +func TestLifecycle_Conflict_PreservesBaseline(t *testing.T) { + s := testScheme() + modifiedSpec := specWithNPM(true) + experimentSpec := specWithAPM(true) + expHash, _ := ComputeSpecHash(&experimentSpec) + expRevName := RevisionName(testDDAName, expHash) + + now := time.Now() + dda := newTestDDAWithStatus(modifiedSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: expRevName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + StartedAt: timePtr(now.Add(-5 * time.Minute)), + BaselineRevision: "baseline-rev", + }, + }) + + expCR := makeControllerRevision(expRevName, testNamespace, dda, experimentSpec) + baselineCR := makeControllerRevision("baseline-rev", testNamespace, dda, specWithAPM(false)) + c := buildFakeClient(s, dda, expCR, baselineCR) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + + require.NoError(t, err) + assert.Equal(t, "baseline-rev", dda.Status.Experiment.BaselineRevision, + "baselineRevision should be preserved after abort for FA acknowledgment") +} + +// ============================================================================ +// Revision pointer tracking tests +// ============================================================================ + +func TestRevisionPointers_FirstReconcile(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + dda := newTestDDA(spec) // No status set yet + c := buildFakeClient(s, dda) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.NotEmpty(t, dda.Status.CurrentRevision, "currentRevision should be set on first reconcile") + assert.Empty(t, dda.Status.PreviousRevision, "previousRevision should be empty on first reconcile") +} + +func TestRevisionPointers_SpecChange(t *testing.T) { + s := testScheme() + oldSpec := specWithAPM(true) + newSpec := specWithNPM(true) + + oldHash, _ := ComputeSpecHash(&oldSpec) + oldRevName := RevisionName(testDDAName, oldHash) + oldCR := makeControllerRevision(oldRevName, testNamespace, nil, oldSpec) + + // DDA currently has the new spec, but currentRevision points to old + dda := newTestDDAWithStatus(newSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: oldRevName, + }) + // Set UID and ownerRef for the old CR + oldCR.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: "datadoghq.com/v2alpha1", + Kind: "DatadogAgent", + Name: dda.Name, + UID: dda.UID, + Controller: apiutils.NewBoolPointer(true), + }, + } + c := buildFakeClient(s, dda, oldCR) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.Equal(t, oldRevName, dda.Status.PreviousRevision, "previousRevision should be the old currentRevision") + assert.NotEqual(t, oldRevName, dda.Status.CurrentRevision, "currentRevision should be updated to new hash") +} + +// TestLifecycle_FirstReconcile_FAPayloadMatches tests the happy path: +// RC sets ExpectedSpecHash, patches spec to B. First reconcile sees spec=B, +// hash matches ExpectedSpecHash → experiment proceeds normally. +func TestLifecycle_FirstReconcile_FAPayloadMatches(t *testing.T) { + s := testScheme() + experimentSpec := specWithNPM(true) + expHash, _ := ComputeSpecHash(&experimentSpec) + + baselineSpec := specWithAPM(false) + baselineHash, _ := ComputeSpecHash(&baselineSpec) + baselineRevName := RevisionName(testDDAName, baselineHash) + + now := time.Now() + dda := newTestDDAWithStatus(experimentSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: baselineRevName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: baselineRevName, + ExpectedSpecHash: expHash, // RC set this + }, + }) + + baselineCR := makeControllerRevision(baselineRevName, testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, baselineCR) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + require.NoError(t, err) + + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, dda.Status.Experiment.Phase, + "experiment should continue — spec matches FA payload") + assert.Empty(t, dda.Status.Experiment.ExpectedSpecHash, + "ExpectedSpecHash should be cleared after successful validation") +} + +// TestLifecycle_StatusBeforeSpec_HashSurvives tests the race where the RC +// status update (setting ExpectedSpecHash) triggers a reconcile before the +// spec patch arrives. The spec is still the baseline, so specChanged=false. +// ExpectedSpecHash must NOT be cleared — it must survive until the spec +// patch arrives on a subsequent reconcile. +func TestLifecycle_StatusBeforeSpec_HashSurvives(t *testing.T) { + s := testScheme() + // Spec is still the baseline (RC hasn't patched spec yet) + baselineSpec := specWithAPM(false) + baselineHash, _ := ComputeSpecHash(&baselineSpec) + baselineRevName := RevisionName(testDDAName, baselineHash) + + // But status already has ExpectedSpecHash (RC updated status first) + experimentSpec := specWithNPM(true) + expHash, _ := ComputeSpecHash(&experimentSpec) + + now := time.Now() + dda := newTestDDAWithStatus(baselineSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: baselineRevName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: baselineRevName, + ExpectedSpecHash: expHash, + }, + }) + + baselineCR := makeControllerRevision(baselineRevName, testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, baselineCR) + + // First reconcile: spec unchanged (still baseline), status has ExpectedSpecHash + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + require.NoError(t, err) + + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, dda.Status.Experiment.Phase, + "experiment should still be running — spec hasn't changed yet") + assert.Equal(t, expHash, dda.Status.Experiment.ExpectedSpecHash, + "ExpectedSpecHash must survive when spec hasn't changed") + + // Second reconcile: spec patch arrives (NPM enabled) + dda.Spec = experimentSpec + _, _, err = HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + require.NoError(t, err) + + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, dda.Status.Experiment.Phase, + "experiment should continue — spec matches ExpectedSpecHash") + assert.Empty(t, dda.Status.Experiment.ExpectedSpecHash, + "ExpectedSpecHash should be cleared after successful validation") +} + +// TestLifecycle_FirstReconcile_UserEditBeforeReconcile tests the edge case: +// RC patches spec to B, but user edits to C before the first reconcile. +// ExpectedSpecHash != hash(C) → abort immediately. +func TestLifecycle_FirstReconcile_UserEditBeforeReconcile(t *testing.T) { + s := testScheme() + // FA intended spec B (NPM) + experimentSpec := specWithNPM(true) + expHash, _ := ComputeSpecHash(&experimentSpec) + + // But user changed spec to C (APM enabled) + userSpec := specWithAPM(true) + + baselineSpec := specWithAPM(false) + baselineHash, _ := ComputeSpecHash(&baselineSpec) + baselineRevName := RevisionName(testDDAName, baselineHash) + + now := time.Now() + dda := newTestDDAWithStatus(userSpec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: baselineRevName, + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: baselineRevName, + ExpectedSpecHash: expHash, // Hash of FA's intended spec (NPM) + }, + }) + + baselineCR := makeControllerRevision(baselineRevName, testNamespace, dda, baselineSpec) + c := buildFakeClient(s, dda, baselineCR) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, now, DefaultExperimentTimeout) + require.NoError(t, err) + + assert.Equal(t, v2alpha1.ExperimentPhaseAborted, dda.Status.Experiment.Phase, + "experiment should abort — user edited spec before first reconcile") +} + +func TestRevisionPointers_NoChange(t *testing.T) { + s := testScheme() + spec := specWithAPM(true) + hash, _ := ComputeSpecHash(&spec) + revName := RevisionName(testDDAName, hash) + + dda := newTestDDAWithStatus(spec, v2alpha1.DatadogAgentStatus{ + CurrentRevision: revName, + PreviousRevision: "old-rev", + }) + + cr := makeControllerRevision(revName, testNamespace, dda, spec) + c := buildFakeClient(s, dda, cr) + + _, _, err := HandleExperimentLifecycle(context.TODO(), c, dda, s, time.Now(), DefaultExperimentTimeout) + + require.NoError(t, err) + assert.Equal(t, revName, dda.Status.CurrentRevision, "currentRevision should not change") + assert.Equal(t, "old-rev", dda.Status.PreviousRevision, "previousRevision should not change") +} diff --git a/internal/controller/datadogagent/experiment/lifecycle.go b/internal/controller/datadogagent/experiment/lifecycle.go new file mode 100644 index 000000000..13d6063ff --- /dev/null +++ b/internal/controller/datadogagent/experiment/lifecycle.go @@ -0,0 +1,223 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package experiment + +import ( + "context" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" +) + +// HandleExperimentLifecycle is the main hook called from the reconciler. +// It manages ControllerRevision creation, revision pointer updates, and experiment phase transitions. +// Returns (shouldReturn, result, err). shouldReturn=true means the reconciler should return early. +func HandleExperimentLifecycle(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, scheme *runtime.Scheme, now time.Time, timeout time.Duration) (bool, reconcile.Result, error) { + // Step 1: Handle terminal phases and rollback/restore phases. + // These run BEFORE revision creation. + if dda.Status.Experiment != nil { + switch dda.Status.Experiment.Phase { + case v2alpha1.ExperimentPhaseRollback: + if dda.Status.Experiment.BaselineRevision != "" { + // First reconcile after stopExperiment: restore spec from baseline. + // handleRestore clears baselineRevision so the next reconcile + // knows the restore already happened. + return handleRestore(ctx, c, dda, v2alpha1.ExperimentPhaseRollback) + } + // Second reconcile: restore already done, clear experiment. + dda.Status.Experiment = nil + + case v2alpha1.ExperimentPhaseTimeout: + if dda.Status.Experiment.BaselineRevision != "" { + // Same as rollback: first reconcile after timeout detection. + // This path is entered when the timeout phase was persisted on + // a prior early return that triggered the restore. + return handleRestore(ctx, c, dda, v2alpha1.ExperimentPhaseTimeout) + } + // Second reconcile: restore already done, clear experiment. + dda.Status.Experiment = nil + + case v2alpha1.ExperimentPhasePromoted: + // Promoted: clear experiment state, continue to revision tracking. + dda.Status.Experiment = nil + + case v2alpha1.ExperimentPhaseAborted: + // Aborted: no action needed. Experiment stays in aborted state + // until FA acknowledges (or a new experiment starts). + } + } + + // Step 2: Create ControllerRevision for current spec if it changed. + revName, created, err := CreateControllerRevision(ctx, c, dda, scheme) + if err != nil { + return false, reconcile.Result{}, err + } + + // Step 3: Update revision pointers if spec changed. + // Save the old currentRevision BEFORE updating — needed for conflict detection. + prevCurrentRevision := dda.Status.CurrentRevision + specChanged := false + if created || dda.Status.CurrentRevision == "" { + if dda.Status.CurrentRevision != "" && dda.Status.CurrentRevision != revName { + dda.Status.PreviousRevision = dda.Status.CurrentRevision + specChanged = true + } + dda.Status.CurrentRevision = revName + } else if dda.Status.CurrentRevision != revName { + dda.Status.PreviousRevision = dda.Status.CurrentRevision + dda.Status.CurrentRevision = revName + specChanged = true + } + + // Step 4: Handle running experiment (timeout + conflict detection). + if dda.Status.Experiment != nil && dda.Status.Experiment.Phase == v2alpha1.ExperimentPhaseRunning { + shouldReturn, result, err := handleRunning(ctx, c, dda, prevCurrentRevision, specChanged, now, timeout) + if err != nil || shouldReturn { + return shouldReturn, result, err + } + } + + // Step 5: GC old revisions + keep := BuildKeepSet(&dda.Status) + if err := GarbageCollectRevisions(ctx, c, dda, keep); err != nil { + return false, reconcile.Result{}, err + } + + return false, reconcile.Result{}, nil +} + +// handleRestore restores the DDA spec from the baseline revision, then +// persists the terminal experiment phase directly via a status update. +// +// The spec restore (client.Update) bumps the resourceVersion, so we must +// re-fetch the object before writing the status. This avoids the conflict +// that would occur if the caller tried to persist status using a stale copy. +func handleRestore(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, phase v2alpha1.ExperimentPhase) (bool, reconcile.Result, error) { + exp := dda.Status.Experiment + if exp == nil || exp.BaselineRevision == "" { + return false, reconcile.Result{}, nil + } + + if err := RestoreSpecFromRevision(ctx, c, dda, exp.BaselineRevision); err != nil { + return false, reconcile.Result{}, err + } + + // Re-fetch to get the current resourceVersion after the spec update. + refreshed := dda.DeepCopy() + if err := c.Get(ctx, client.ObjectKeyFromObject(dda), refreshed); err != nil { + return false, reconcile.Result{}, fmt.Errorf("failed to re-fetch DDA after spec restore: %w", err) + } + + // Set terminal phase and clear baselineRevision to signal the restore is done. + refreshed.Status.Experiment = &v2alpha1.ExperimentStatus{ + Phase: phase, + ID: exp.ID, + } + if err := c.Status().Update(ctx, refreshed); err != nil { + return false, reconcile.Result{}, fmt.Errorf("failed to persist experiment status after restore: %w", err) + } + + // Update the in-memory object so the caller sees the persisted state. + dda.Status.Experiment = refreshed.Status.Experiment + + return true, reconcile.Result{Requeue: true}, nil +} + +// handleRunning checks for timeout and conflict during a running experiment. +// prevCurrentRevision is the currentRevision value before this reconcile updated it. +// specChanged is true if the spec hash differs from prevCurrentRevision. +func handleRunning(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, prevCurrentRevision string, specChanged bool, now time.Time, timeout time.Duration) (bool, reconcile.Result, error) { + exp := dda.Status.Experiment + + // Set startedAt if not already set (first reconcile after experiment start) + if exp.StartedAt == nil { + mt := metav1.NewTime(now) + exp.StartedAt = &mt + } + + // Check timeout — restore baseline and set phase=timeout + if CheckTimeout(exp, now, timeout) { + return handleRestore(ctx, c, dda, v2alpha1.ExperimentPhaseTimeout) + } + + // Check conflict: if the spec changed during a running experiment, determine + // whether it's the expected FA-owned mutation or an external edit. + if specChanged && prevCurrentRevision != "" { + if exp.ExpectedSpecHash != "" { + // ExpectedSpecHash is set — this is the first spec change since + // startExperiment. Verify it matches what FA sent. + currentHash, err := ComputeSpecHash(&dda.Spec) + if err != nil { + return false, reconcile.Result{}, err + } + if currentHash != exp.ExpectedSpecHash { + // Spec doesn't match FA payload — user edited after RC patched + exp.Phase = v2alpha1.ExperimentPhaseAborted + exp.StartedAt = nil + return false, reconcile.Result{}, nil + } + // Spec matches FA payload — expected mutation, clear the hash + exp.ExpectedSpecHash = "" + } else { + // ExpectedSpecHash already cleared (validated on a prior reconcile) + // — any subsequent spec change is an external edit. + exp.Phase = v2alpha1.ExperimentPhaseAborted + exp.StartedAt = nil + return false, reconcile.Result{}, nil + } + } + // When specChanged is false and ExpectedSpecHash is still set, keep it. + // The RC status update (which sets ExpectedSpecHash) arrives before the + // spec patch, so the first reconcile may see the hash but not the spec + // change yet. The hash must survive until the spec patch arrives. + + return false, reconcile.Result{}, nil +} + +// CheckTimeout returns true if the experiment has exceeded the timeout duration. +func CheckTimeout(exp *v2alpha1.ExperimentStatus, now time.Time, timeout time.Duration) bool { + if exp == nil || exp.StartedAt == nil { + return false + } + return now.Sub(exp.StartedAt.Time) >= timeout +} + +// CheckConflict returns true if the current DDA spec hash differs from the +// ControllerRevision pointed to by currentRevision (external edit detected). +func CheckConflict(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent) (bool, error) { + if dda.Status.CurrentRevision == "" { + return false, nil + } + + currentHash, err := ComputeSpecHash(&dda.Spec) + if err != nil { + return false, err + } + + currentRevName := RevisionName(dda.Name, currentHash) + return currentRevName != dda.Status.CurrentRevision, nil +} + +// BuildKeepSet returns the set of revision names that should be protected from GC. +func BuildKeepSet(status *v2alpha1.DatadogAgentStatus) map[string]bool { + keep := make(map[string]bool) + if status.CurrentRevision != "" { + keep[status.CurrentRevision] = true + } + if status.PreviousRevision != "" { + keep[status.PreviousRevision] = true + } + if status.Experiment != nil && status.Experiment.BaselineRevision != "" { + keep[status.Experiment.BaselineRevision] = true + } + return keep +} diff --git a/internal/controller/datadogagent/experiment/revision.go b/internal/controller/datadogagent/experiment/revision.go new file mode 100644 index 000000000..f9a29d2f0 --- /dev/null +++ b/internal/controller/datadogagent/experiment/revision.go @@ -0,0 +1,177 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package experiment + +import ( + "context" + "encoding/json" + "fmt" + + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + "github.com/DataDog/datadog-operator/pkg/controller/utils/comparison" +) + +// ComputeSpecHash returns a truncated MD5 hash of the DDA spec for ControllerRevision naming. +func ComputeSpecHash(spec *v2alpha1.DatadogAgentSpec) (string, error) { + hash, err := comparison.GenerateMD5ForSpec(spec) + if err != nil { + return "", fmt.Errorf("failed to compute spec hash: %w", err) + } + if len(hash) > RevisionHashLength { + hash = hash[:RevisionHashLength] + } + return hash, nil +} + +// RevisionName returns the ControllerRevision name for a DDA with the given spec hash. +func RevisionName(ddaName string, specHash string) string { + return fmt.Sprintf("%s-%s", ddaName, specHash) +} + +// serializeSpec serializes a DDA spec to JSON for ControllerRevision storage. +func serializeSpec(spec *v2alpha1.DatadogAgentSpec) ([]byte, error) { + return json.Marshal(spec) +} + +// deserializeSpec deserializes a DDA spec from ControllerRevision data. +func deserializeSpec(data []byte) (*v2alpha1.DatadogAgentSpec, error) { + spec := &v2alpha1.DatadogAgentSpec{} + if err := json.Unmarshal(data, spec); err != nil { + return nil, fmt.Errorf("failed to deserialize spec from revision: %w", err) + } + return spec, nil +} + +// CreateControllerRevision creates a ControllerRevision for the given DDA spec if one +// with the same hash does not already exist. Returns the revision name and whether it was newly created. +func CreateControllerRevision(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, scheme *runtime.Scheme) (string, bool, error) { + hash, err := ComputeSpecHash(&dda.Spec) + if err != nil { + return "", false, err + } + + name := RevisionName(dda.Name, hash) + + // Check if it already exists + existing := &appsv1.ControllerRevision{} + err = c.Get(ctx, types.NamespacedName{Name: name, Namespace: dda.Namespace}, existing) + if err == nil { + return name, false, nil + } + if !apierrors.IsNotFound(err) { + return "", false, fmt.Errorf("failed to check for existing revision %s: %w", name, err) + } + + // Serialize spec + data, err := serializeSpec(&dda.Spec) + if err != nil { + return "", false, err + } + + cr := &appsv1.ControllerRevision{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: dda.Namespace, + }, + Data: runtime.RawExtension{Raw: data}, + } + + // Set owner reference + if err := controllerutil.SetControllerReference(dda, cr, scheme); err != nil { + return "", false, fmt.Errorf("failed to set owner reference: %w", err) + } + + if err := c.Create(ctx, cr); err != nil { + if apierrors.IsAlreadyExists(err) { + return name, false, nil + } + return "", false, fmt.Errorf("failed to create ControllerRevision %s: %w", name, err) + } + + return name, true, nil +} + +// GetControllerRevision fetches a ControllerRevision by name in the given namespace. +func GetControllerRevision(ctx context.Context, c client.Client, namespace, name string) (*appsv1.ControllerRevision, error) { + cr := &appsv1.ControllerRevision{} + if err := c.Get(ctx, types.NamespacedName{Name: name, Namespace: namespace}, cr); err != nil { + return nil, err + } + return cr, nil +} + +// RestoreSpecFromRevision reads a ControllerRevision and patches the DDA spec to match the stored spec. +func RestoreSpecFromRevision(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, revisionName string) error { + cr, err := GetControllerRevision(ctx, c, dda.Namespace, revisionName) + if err != nil { + return fmt.Errorf("failed to get revision %s for restore: %w", revisionName, err) + } + + restoredSpec, err := deserializeSpec(cr.Data.Raw) + if err != nil { + return err + } + + // Update the DDA spec in the API server + ddaCopy := dda.DeepCopy() + ddaCopy.Spec = *restoredSpec + if err := c.Update(ctx, ddaCopy); err != nil { + return fmt.Errorf("failed to update DDA spec from revision %s: %w", revisionName, err) + } + + return nil +} + +// ListOwnedRevisions lists all ControllerRevisions in the DDA namespace that are owned by the DDA. +func ListOwnedRevisions(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent) ([]appsv1.ControllerRevision, error) { + revList := &appsv1.ControllerRevisionList{} + if err := c.List(ctx, revList, client.InNamespace(dda.Namespace)); err != nil { + return nil, fmt.Errorf("failed to list ControllerRevisions: %w", err) + } + + var owned []appsv1.ControllerRevision + for _, rev := range revList.Items { + if isOwnedBy(&rev, dda) { + owned = append(owned, rev) + } + } + return owned, nil +} + +// GarbageCollectRevisions deletes ControllerRevisions owned by the DDA that are not in the keep set. +func GarbageCollectRevisions(ctx context.Context, c client.Client, dda *v2alpha1.DatadogAgent, keep map[string]bool) error { + owned, err := ListOwnedRevisions(ctx, c, dda) + if err != nil { + return err + } + + for i := range owned { + if !keep[owned[i].Name] { + if err := c.Delete(ctx, &owned[i]); err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete ControllerRevision %s: %w", owned[i].Name, err) + } + } + } + return nil +} + +// isOwnedBy checks if a ControllerRevision is owned by the given DDA. +func isOwnedBy(cr *appsv1.ControllerRevision, dda *v2alpha1.DatadogAgent) bool { + for _, ref := range cr.OwnerReferences { + if ref.UID == dda.UID { + return true + } + } + return false +} diff --git a/internal/controller/datadogagent/experiment/types.go b/internal/controller/datadogagent/experiment/types.go new file mode 100644 index 000000000..72262c789 --- /dev/null +++ b/internal/controller/datadogagent/experiment/types.go @@ -0,0 +1,16 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package experiment + +import "time" + +const ( + // DefaultExperimentTimeout is the default duration before an unacknowledged experiment auto-rolls back. + DefaultExperimentTimeout = 30 * time.Minute + + // RevisionHashLength is the number of hex characters used from the MD5 hash for revision naming. + RevisionHashLength = 10 +) diff --git a/internal/controller/datadogagent/testutils/client_utils.go b/internal/controller/datadogagent/testutils/client_utils.go index 8908a8eb3..d1c182a4d 100644 --- a/internal/controller/datadogagent/testutils/client_utils.go +++ b/internal/controller/datadogagent/testutils/client_utils.go @@ -29,6 +29,8 @@ func TestScheme() *runtime.Scheme { s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.DatadogAgentProfileList{}) s.AddKnownTypes(appsv1.SchemeGroupVersion, &appsv1.DaemonSet{}) s.AddKnownTypes(appsv1.SchemeGroupVersion, &appsv1.Deployment{}) + s.AddKnownTypes(appsv1.SchemeGroupVersion, &appsv1.ControllerRevision{}) + s.AddKnownTypes(appsv1.SchemeGroupVersion, &appsv1.ControllerRevisionList{}) s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.Secret{}) s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.ServiceAccount{}) s.AddKnownTypes(corev1.SchemeGroupVersion, &corev1.ConfigMap{}) diff --git a/internal/controller/datadogagent_controller.go b/internal/controller/datadogagent_controller.go index d986ccff6..58565634f 100644 --- a/internal/controller/datadogagent_controller.go +++ b/internal/controller/datadogagent_controller.go @@ -198,7 +198,7 @@ type DatadogAgentReconciler struct { // +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=list;watch // +kubebuilder:rbac:groups="",resources=replicationcontrollers,verbs=get;list;watch // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=list;watch -// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=list;watch +// +kubebuilder:rbac:groups=apps,resources=controllerrevisions,verbs=get;list;watch;create;update;delete // +kubebuilder:rbac:groups=autoscaling,resources=horizontalpodautoscalers,verbs=list;watch // +kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=list;watch // +kubebuilder:rbac:groups=batch,resources=jobs,verbs=list;watch diff --git a/marketplaces/addon_manifest.yaml b/marketplaces/addon_manifest.yaml index 2c399c175..b25d1f58a 100644 --- a/marketplaces/addon_manifest.yaml +++ b/marketplaces/addon_manifest.yaml @@ -17822,7 +17822,11 @@ rules: resources: - controllerrevisions verbs: + - create + - delete + - get - list + - update - watch - apiGroups: - apps diff --git a/marketplaces/charts/google-marketplace/schema.yaml b/marketplaces/charts/google-marketplace/schema.yaml index a4b6b0379..82ffffb97 100644 --- a/marketplaces/charts/google-marketplace/schema.yaml +++ b/marketplaces/charts/google-marketplace/schema.yaml @@ -177,7 +177,11 @@ properties: resources: - controllerrevisions verbs: + - create + - delete + - get - list + - update - watch - apiGroups: - apps diff --git a/pkg/remoteconfig/experiment.go b/pkg/remoteconfig/experiment.go new file mode 100644 index 000000000..77e001337 --- /dev/null +++ b/pkg/remoteconfig/experiment.go @@ -0,0 +1,261 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package remoteconfig + +import ( + "context" + "encoding/json" + "fmt" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + kubeclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + "github.com/DataDog/datadog-operator/internal/controller/datadogagent/defaults" + "github.com/DataDog/datadog-operator/internal/controller/datadogagent/experiment" +) + +// ExperimentAction represents the type of experiment signal from Fleet Automation. +type ExperimentAction string + +const ( + // ExperimentActionStart signals the operator to start a new experiment. + ExperimentActionStart ExperimentAction = "startExperiment" + // ExperimentActionStop signals the operator to stop (rollback) an experiment. + ExperimentActionStop ExperimentAction = "stopExperiment" + // ExperimentActionPromote signals the operator to promote an experiment. + ExperimentActionPromote ExperimentAction = "promoteExperiment" +) + +// ExperimentSignal is the RC payload for Fleet Automation experiment signals. +type ExperimentSignal struct { + // Action is the experiment command: startExperiment, stopExperiment, promoteExperiment. + Action ExperimentAction `json:"action"` + // ExperimentID is the unique ID for this experiment, set by FA. + ExperimentID string `json:"experiment_id"` + // Config is the DDA spec patch to apply (only present for startExperiment). + Config *v2alpha1.DatadogAgentSpec `json:"config,omitempty"` +} + +// parseExperimentSignal attempts to parse an RC payload as an experiment signal. +// Returns nil if the payload is not an experiment signal (i.e., a regular agent config). +func parseExperimentSignal(data []byte) (*ExperimentSignal, error) { + // Try to detect if this is an experiment signal by checking for the "action" field + var probe struct { + Action string `json:"action"` + } + if err := json.Unmarshal(data, &probe); err != nil { + return nil, fmt.Errorf("failed to unmarshal RC payload: %w", err) + } + if probe.Action == "" { + return nil, nil // Regular agent config, not an experiment signal + } + + signal := &ExperimentSignal{} + if err := json.Unmarshal(data, signal); err != nil { + return nil, fmt.Errorf("failed to unmarshal experiment signal: %w", err) + } + + switch signal.Action { + case ExperimentActionStart, ExperimentActionStop, ExperimentActionPromote: + return signal, nil + default: + return nil, fmt.Errorf("unknown experiment action: %s", signal.Action) + } +} + +// handleExperimentSignal processes an experiment signal from Fleet Automation. +// It updates the DDA status and (for startExperiment) patches the DDA spec. +func (r *RemoteConfigUpdater) handleExperimentSignal(ctx context.Context, signal *ExperimentSignal) error { + r.mu.Lock() + defer r.mu.Unlock() + + ddaList := &v2alpha1.DatadogAgentList{} + if err := r.kubeClient.List(ctx, ddaList); err != nil { + return fmt.Errorf("unable to list DatadogAgents: %w", err) + } + if len(ddaList.Items) == 0 { + return fmt.Errorf("cannot find any DatadogAgent") + } + dda := ddaList.Items[0] + + switch signal.Action { + case ExperimentActionStart: + return r.handleStartExperiment(ctx, &dda, signal) + case ExperimentActionStop: + return r.handleStopExperiment(ctx, &dda, signal) + case ExperimentActionPromote: + return r.handlePromoteExperiment(ctx, &dda, signal) + default: + return fmt.Errorf("unknown experiment action: %s", signal.Action) + } +} + +// handleStartExperiment sets experiment status to running and patches DDA spec. +// Order: status first (with baselineRevision), then spec patch. +func (r *RemoteConfigUpdater) handleStartExperiment(ctx context.Context, dda *v2alpha1.DatadogAgent, signal *ExperimentSignal) error { + if signal.Config == nil { + return fmt.Errorf("startExperiment signal missing config payload") + } + + // Reject if no baseline exists yet (DDA hasn't been reconciled). + // Without a baseline, rollback/timeout can't restore the previous spec. + if dda.Status.CurrentRevision == "" { + return fmt.Errorf("cannot start experiment: no currentRevision set (DDA not yet reconciled)") + } + + // Reject if any experiment lifecycle is active — FA must wait for the + // current experiment to be fully resolved (promoted, rolled back, timed out, + // or cleared) before starting a new one. This covers: + // - running: experiment in progress + // - rollback: stop received, restore pending + // - timeout: auto-rollback pending or just completed + // - aborted: awaiting FA acknowledgment + // - promoted: being cleared by reconciler + // + // The only exception is retrying the same experiment after a partial start + // failure (status updated but spec patch failed): ExpectedSpecHash is still + // set and the experiment ID matches. + if dda.Status.Experiment != nil { + exp := dda.Status.Experiment + // Allow retry of the same experiment if spec wasn't applied yet + if exp.Phase == v2alpha1.ExperimentPhaseRunning && + exp.ExpectedSpecHash != "" && + signal.ExperimentID == exp.ID { + r.logger.Info("Retrying startExperiment (prior attempt may have failed during spec patch)", + "id", signal.ExperimentID) + } else { + return fmt.Errorf("cannot start experiment %s: experiment %s is active (phase=%s)", + signal.ExperimentID, exp.ID, exp.Phase) + } + } + + // Compute hash of the FA payload AFTER applying defaults, so it matches + // the canonical form the reconciler uses (defaults.DefaultDatadogAgentSpec + // runs before HandleExperimentLifecycle). + configCopy := signal.Config.DeepCopy() + defaults.DefaultDatadogAgentSpec(configCopy) + specHash, err := experiment.ComputeSpecHash(configCopy) + if err != nil { + return fmt.Errorf("failed to compute spec hash for experiment config: %w", err) + } + + // Step 1: Update status — set phase=running, lock baseline, record expected hash + if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: dda.Status.CurrentRevision, + ID: signal.ExperimentID, + ExpectedSpecHash: specHash, + }); err != nil { + return fmt.Errorf("failed to set experiment status for start: %w", err) + } + + // Step 2: Re-fetch DDA to get updated resourceVersion after status update + refreshed := &v2alpha1.DatadogAgent{} + if err := r.kubeClient.Get(ctx, kubeclient.ObjectKeyFromObject(dda), refreshed); err != nil { + return fmt.Errorf("failed to re-fetch DDA after status update: %w", err) + } + + // Step 3: Patch DDA spec with experiment config + refreshed.Spec = *signal.Config + if err := r.kubeClient.Update(ctx, refreshed); err != nil { + return fmt.Errorf("failed to patch DDA spec for startExperiment: %w", err) + } + + r.logger.Info("Started experiment", "id", signal.ExperimentID) + return nil +} + +// handleStopExperiment sets experiment status to rollback. +// The reconciler will detect this and restore from baselineRevision. +func (r *RemoteConfigUpdater) handleStopExperiment(ctx context.Context, dda *v2alpha1.DatadogAgent, signal *ExperimentSignal) error { + if err := r.validateExperimentSignal(dda, signal, "stopExperiment"); err != nil { + return err + } + + if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRollback, + BaselineRevision: dda.Status.Experiment.BaselineRevision, + ID: dda.Status.Experiment.ID, + }); err != nil { + return fmt.Errorf("failed to set experiment status for stop: %w", err) + } + + r.logger.Info("Stopped experiment", "id", signal.ExperimentID) + return nil +} + +// handlePromoteExperiment sets experiment status to promoted. +// The reconciler will detect this and clear experiment state. +func (r *RemoteConfigUpdater) handlePromoteExperiment(ctx context.Context, dda *v2alpha1.DatadogAgent, signal *ExperimentSignal) error { + if err := r.validateExperimentSignal(dda, signal, "promoteExperiment"); err != nil { + return err + } + + if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhasePromoted, + ID: dda.Status.Experiment.ID, + // BaselineRevision and StartedAt cleared — reconciler will nil out experiment + }); err != nil { + return fmt.Errorf("failed to set experiment status for promote: %w", err) + } + + r.logger.Info("Promoted experiment", "id", signal.ExperimentID) + return nil +} + +// validateExperimentSignal checks that a stop/promote signal targets the +// currently running experiment. Returns nil if valid, a logged-and-ignored +// nil-error for mismatches that should be silently dropped. +func (r *RemoteConfigUpdater) validateExperimentSignal(dda *v2alpha1.DatadogAgent, signal *ExperimentSignal, action string) error { + if dda.Status.Experiment == nil || dda.Status.Experiment.Phase != v2alpha1.ExperimentPhaseRunning { + r.logger.Info(fmt.Sprintf("Ignoring %s: no running experiment", action), + "currentPhase", experimentPhase(dda)) + return fmt.Errorf("ignoring %s: no running experiment", action) + } + if signal.ExperimentID != "" && dda.Status.Experiment.ID != "" && + signal.ExperimentID != dda.Status.Experiment.ID { + r.logger.Info(fmt.Sprintf("Ignoring %s: experiment ID mismatch", action), + "signalID", signal.ExperimentID, + "runningID", dda.Status.Experiment.ID) + return fmt.Errorf("ignoring %s: signal targets experiment %s but %s is running", + action, signal.ExperimentID, dda.Status.Experiment.ID) + } + return nil +} + +// setExperimentStatus updates the DDA status with the given experiment state. +func (r *RemoteConfigUpdater) setExperimentStatus(ctx context.Context, dda *v2alpha1.DatadogAgent, experiment *v2alpha1.ExperimentStatus) error { + newStatus := dda.Status.DeepCopy() + newStatus.Experiment = experiment + + if apiequality.Semantic.DeepEqual(&dda.Status, newStatus) { + return nil // No change + } + + ddaUpdate := dda.DeepCopy() + ddaUpdate.Status = *newStatus + if err := r.kubeClient.Status().Update(ctx, ddaUpdate); err != nil { + if apierrors.IsConflict(err) { + r.logger.Info("unable to update experiment status due to conflict") + return fmt.Errorf("conflict updating experiment status: %w", err) + } + return err + } + + // Update the in-memory object so subsequent operations see the new status + dda.Status = *newStatus + return nil +} + +// experimentPhase returns the current experiment phase string, or "none" if no experiment. +func experimentPhase(dda *v2alpha1.DatadogAgent) string { + if dda.Status.Experiment == nil { + return "none" + } + return string(dda.Status.Experiment.Phase) +} diff --git a/pkg/remoteconfig/experiment_test.go b/pkg/remoteconfig/experiment_test.go new file mode 100644 index 000000000..3a0c9c718 --- /dev/null +++ b/pkg/remoteconfig/experiment_test.go @@ -0,0 +1,551 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package remoteconfig + +import ( + "context" + "encoding/json" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + apiutils "github.com/DataDog/datadog-operator/api/utils" +) + +func init() { + logf.SetLogger(zap.New(zap.UseDevMode(true))) +} + +func testScheme() *runtime.Scheme { + s := runtime.NewScheme() + _ = v2alpha1.AddToScheme(s) + return s +} + +func newTestDDA() *v2alpha1.DatadogAgent { + return &v2alpha1.DatadogAgent{ + TypeMeta: metav1.TypeMeta{ + Kind: "DatadogAgent", + APIVersion: "datadoghq.com/v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "datadog-agent", + Namespace: "datadog", + }, + Spec: v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(false), + }, + }, + }, + } +} + +func newTestDDAWithExperiment(phase v2alpha1.ExperimentPhase) *v2alpha1.DatadogAgent { + dda := newTestDDA() + dda.Status = v2alpha1.DatadogAgentStatus{ + CurrentRevision: "datadog-agent-abc123", + Experiment: &v2alpha1.ExperimentStatus{ + Phase: phase, + BaselineRevision: "datadog-agent-baseline", + ID: "exp-001", + }, + } + return dda +} + +func newUpdater(objs ...runtime.Object) *RemoteConfigUpdater { + s := testScheme() + builder := fake.NewClientBuilder().WithScheme(s) + for _, obj := range objs { + builder = builder.WithRuntimeObjects(obj) + } + builder = builder.WithStatusSubresource(&v2alpha1.DatadogAgent{}) + c := builder.Build() + return &RemoteConfigUpdater{ + kubeClient: c, + logger: logr.New(logf.NullLogSink{}), + } +} + +func getDDA(t *testing.T, r *RemoteConfigUpdater) *v2alpha1.DatadogAgent { + t.Helper() + dda := &v2alpha1.DatadogAgent{} + err := r.kubeClient.Get(context.TODO(), types.NamespacedName{ + Name: "datadog-agent", + Namespace: "datadog", + }, dda) + require.NoError(t, err) + return dda +} + +// ============================================================================ +// parseExperimentSignal tests +// ============================================================================ + +func TestParseExperimentSignal_StartExperiment(t *testing.T) { + spec := v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + } + payload := ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-001", + Config: &spec, + } + data, _ := json.Marshal(payload) + + signal, err := parseExperimentSignal(data) + + require.NoError(t, err) + require.NotNil(t, signal) + assert.Equal(t, ExperimentActionStart, signal.Action) + assert.Equal(t, "exp-001", signal.ExperimentID) + assert.NotNil(t, signal.Config) + assert.True(t, apiutils.BoolValue(signal.Config.Features.APM.Enabled)) +} + +func TestParseExperimentSignal_StopExperiment(t *testing.T) { + payload := ExperimentSignal{ + Action: ExperimentActionStop, + ExperimentID: "exp-001", + } + data, _ := json.Marshal(payload) + + signal, err := parseExperimentSignal(data) + + require.NoError(t, err) + require.NotNil(t, signal) + assert.Equal(t, ExperimentActionStop, signal.Action) + assert.Nil(t, signal.Config) +} + +func TestParseExperimentSignal_PromoteExperiment(t *testing.T) { + payload := ExperimentSignal{ + Action: ExperimentActionPromote, + ExperimentID: "exp-001", + } + data, _ := json.Marshal(payload) + + signal, err := parseExperimentSignal(data) + + require.NoError(t, err) + require.NotNil(t, signal) + assert.Equal(t, ExperimentActionPromote, signal.Action) +} + +func TestParseExperimentSignal_RegularAgentConfig(t *testing.T) { + // Regular agent config (no "action" field) should return nil + payload := DatadogAgentRemoteConfig{ + ID: "config-001", + Name: "my-config", + CoreAgent: &CoreAgentFeaturesConfig{ + SBOM: &SbomConfig{Enabled: apiutils.NewBoolPointer(true)}, + }, + } + data, _ := json.Marshal(payload) + + signal, err := parseExperimentSignal(data) + + require.NoError(t, err) + assert.Nil(t, signal, "regular config should not be parsed as experiment signal") +} + +func TestParseExperimentSignal_UnknownAction(t *testing.T) { + data := []byte(`{"action": "unknownAction", "experiment_id": "exp-001"}`) + + signal, err := parseExperimentSignal(data) + + require.Error(t, err, "unknown action should return error to prevent fallthrough") + assert.Nil(t, signal) +} + +func TestParseExperimentSignal_InvalidJSON(t *testing.T) { + data := []byte(`not json`) + + signal, err := parseExperimentSignal(data) + + require.Error(t, err) + assert.Nil(t, signal) +} + +// ============================================================================ +// handleStartExperiment tests +// ============================================================================ + +func TestHandleStartExperiment_Success(t *testing.T) { + dda := newTestDDA() + dda.Status.CurrentRevision = "datadog-agent-baseline" + r := newUpdater(dda) + + newSpec := v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + } + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-001", + Config: &newSpec, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.NoError(t, err) + + // Verify status + updated := getDDA(t, r) + require.NotNil(t, updated.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) + assert.Equal(t, "datadog-agent-baseline", updated.Status.Experiment.BaselineRevision) + assert.Equal(t, "exp-001", updated.Status.Experiment.ID) + + // Verify ExpectedSpecHash is set (computed from defaulted spec) + assert.NotEmpty(t, updated.Status.Experiment.ExpectedSpecHash, + "ExpectedSpecHash should be set for first-reconcile validation") + + // Verify spec was patched + assert.True(t, apiutils.BoolValue(updated.Spec.Features.APM.Enabled), + "APM should be enabled after startExperiment") +} + +func TestHandleStartExperiment_MissingConfig(t *testing.T) { + dda := newTestDDA() + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-001", + Config: nil, // Missing + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "missing config payload") +} + +func TestHandleStartExperiment_NoCurrentRevision(t *testing.T) { + dda := newTestDDA() + // No CurrentRevision set — DDA hasn't been reconciled yet + r := newUpdater(dda) + + newSpec := v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + } + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-001", + Config: &newSpec, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "no currentRevision set") +} + +func TestHandleStartExperiment_AlreadyRunning(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) + r := newUpdater(dda) + + newSpec := v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + } + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-002", + Config: &newSpec, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "active (phase=running)") + + // Verify original experiment is untouched + updated := getDDA(t, r) + assert.Equal(t, "exp-001", updated.Status.Experiment.ID, + "original experiment ID should be preserved") + assert.Equal(t, "datadog-agent-baseline", updated.Status.Experiment.BaselineRevision, + "original baseline should be preserved") +} + +func TestHandleStartExperiment_RetryAfterPartialFailure(t *testing.T) { + // Simulate: prior start set status (phase=running, expectedSpecHash set) + // but spec patch failed. ExpectedSpecHash still set = retry allowed. + dda := newTestDDA() + dda.Status = v2alpha1.DatadogAgentStatus{ + CurrentRevision: "datadog-agent-baseline", + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: "datadog-agent-baseline", + ID: "exp-001", + ExpectedSpecHash: "abc123", // Still set = spec wasn't validated yet + }, + } + r := newUpdater(dda) + + newSpec := v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + } + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-001", + Config: &newSpec, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.NoError(t, err, "should allow retry when ExpectedSpecHash is still set") + + // Verify spec was patched on retry + updated := getDDA(t, r) + assert.True(t, apiutils.BoolValue(updated.Spec.Features.APM.Enabled), + "APM should be enabled after retry") +} + +func TestHandleStartExperiment_DifferentIDDuringRetryWindow(t *testing.T) { + // Prior start for exp-001 partially failed (ExpectedSpecHash still set). + // A different experiment (exp-002) tries to start — should be rejected. + dda := newTestDDA() + dda.Status = v2alpha1.DatadogAgentStatus{ + CurrentRevision: "datadog-agent-baseline", + Experiment: &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + BaselineRevision: "datadog-agent-baseline", + ID: "exp-001", + ExpectedSpecHash: "abc123", + }, + } + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-002", // Different ID + Config: &v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + }, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "active (phase=running)") + + // Original experiment should be untouched + updated := getDDA(t, r) + assert.Equal(t, "exp-001", updated.Status.Experiment.ID) +} + +func TestHandleStartExperiment_DuringRollback(t *testing.T) { + // Stop was received (phase=rollback) but reconciler hasn't restored yet. + // A new start should be rejected to avoid overwriting the pending rollback. + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRollback) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-002", + Config: &v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + }, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "active (phase=rollback)") + + // Original experiment should be untouched — rollback still pending + updated := getDDA(t, r) + assert.Equal(t, v2alpha1.ExperimentPhaseRollback, updated.Status.Experiment.Phase) + assert.Equal(t, "exp-001", updated.Status.Experiment.ID) +} + +// ============================================================================ +// handleStopExperiment tests +// ============================================================================ + +func TestHandleStopExperiment_RunningExperiment(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStop, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.NoError(t, err) + + updated := getDDA(t, r) + require.NotNil(t, updated.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseRollback, updated.Status.Experiment.Phase) + assert.Equal(t, "datadog-agent-baseline", updated.Status.Experiment.BaselineRevision, + "baseline should be preserved for rollback") +} + +func TestHandleStopExperiment_NoRunningExperiment(t *testing.T) { + dda := newTestDDA() // No experiment + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStop, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "no running experiment") +} + +func TestHandleStopExperiment_AbortedExperiment(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseAborted) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStop, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "no running experiment") +} + +func TestHandleStopExperiment_IDMismatch(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) // ID = "exp-001" + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStop, + ExperimentID: "exp-999", // Wrong ID + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "signal targets experiment") + + // Original experiment should be untouched + updated := getDDA(t, r) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) +} + +// ============================================================================ +// handlePromoteExperiment tests +// ============================================================================ + +func TestHandlePromoteExperiment_RunningExperiment(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionPromote, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.NoError(t, err) + + updated := getDDA(t, r) + require.NotNil(t, updated.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhasePromoted, updated.Status.Experiment.Phase) + assert.Empty(t, updated.Status.Experiment.BaselineRevision, + "baseline should be cleared on promote") +} + +func TestHandlePromoteExperiment_NoRunningExperiment(t *testing.T) { + dda := newTestDDA() + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionPromote, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "no running experiment") +} + +func TestHandlePromoteExperiment_AbortedExperiment(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseAborted) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionPromote, + ExperimentID: "exp-001", + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "no running experiment") +} + +func TestHandlePromoteExperiment_IDMismatch(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) // ID = "exp-001" + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionPromote, + ExperimentID: "exp-999", // Wrong ID + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.Error(t, err) + assert.Contains(t, err.Error(), "signal targets experiment") + + // Original experiment should still be running + updated := getDDA(t, r) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) +} + +// ============================================================================ +// experimentPhase helper tests +// ============================================================================ + +func TestExperimentPhase_NoExperiment(t *testing.T) { + dda := newTestDDA() + assert.Equal(t, "none", experimentPhase(dda)) +} + +func TestExperimentPhase_Running(t *testing.T) { + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseRunning) + assert.Equal(t, "running", experimentPhase(dda)) +} diff --git a/pkg/remoteconfig/updater.go b/pkg/remoteconfig/updater.go index 7dc51dc3d..85159d404 100644 --- a/pkg/remoteconfig/updater.go +++ b/pkg/remoteconfig/updater.go @@ -304,7 +304,38 @@ func (r *RemoteConfigUpdater) agentConfigUpdateCallback(updates map[string]state configIDs = append(configIDs, id) } - mergedUpdate, err := r.parseReceivedUpdates(updates, applyStatus) + // Separate experiment signals from regular configs. Process experiment + // signals first; remaining configs continue through the regular path. + regularUpdates := make(map[string]state.RawConfig) + regularConfigIDs := make([]string, 0, len(updates)) + for configPath, c := range updates { + signal, err := parseExperimentSignal(c.Config) + if err != nil { + r.logger.Error(err, "Failed to parse experiment signal") + applyStatus(configPath, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()}) + return + } + if signal != nil { + r.logger.Info("Received experiment signal", "action", signal.Action, "id", signal.ExperimentID) + if err := r.handleExperimentSignal(ctx, signal); err != nil { + r.logger.Error(err, "Failed to handle experiment signal") + applyStatus(configPath, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()}) + return + } + applyStatus(configPath, state.ApplyStatus{State: state.ApplyStateAcknowledged, Error: ""}) + } else { + regularUpdates[configPath] = c + regularConfigIDs = append(regularConfigIDs, configPath) + } + } + + // If all updates were experiment signals, we're done. + if len(regularUpdates) == 0 { + return + } + + // Regular agent config path — only non-experiment updates remain + mergedUpdate, err := r.parseReceivedUpdates(regularUpdates, applyStatus) if err != nil { r.logger.Error(err, "Failed to merge updates") return @@ -319,12 +350,14 @@ func (r *RemoteConfigUpdater) agentConfigUpdateCallback(updates map[string]state if err := r.getAndUpdateDatadogAgentWithRetry(ctx, mergedUpdate, r.updateInstanceStatus); err != nil { r.logger.Error(err, "Failed to update status") - applyStatus(configIDs[len(configIDs)-1], state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()}) + for _, id := range regularConfigIDs { + applyStatus(id, state.ApplyStatus{State: state.ApplyStateError, Error: err.Error()}) + } return } - // Acknowledge that configs were received - for _, id := range configIDs { + // Acknowledge regular configs + for _, id := range regularConfigIDs { applyStatus(id, state.ApplyStatus{State: state.ApplyStateAcknowledged, Error: ""}) } From e86d0761835a90ee05618226651d559d61058c2c Mon Sep 17 00:00:00 2001 From: Minyi Zhu Date: Tue, 17 Mar 2026 14:18:53 +0100 Subject: [PATCH 2/4] Merge FA patch --- go.mod | 2 +- pkg/remoteconfig/experiment.go | 67 +++++++++++++++++------------ pkg/remoteconfig/experiment_test.go | 34 ++++++++++++++- 3 files changed, 73 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 8f098edbf..4a1477590 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( ) require ( + dario.cat/mergo v1.0.2 github.com/DataDog/datadog-agent/pkg/config/model v0.59.0-rc.5 github.com/DataDog/datadog-agent/pkg/config/remote v0.59.0-rc.5 github.com/DataDog/datadog-agent/pkg/remoteconfig/state v0.59.0-rc.5 @@ -74,7 +75,6 @@ require ( require ( cel.dev/expr v0.24.0 // indirect - dario.cat/mergo v1.0.2 // indirect github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect github.com/BurntSushi/toml v1.5.0 // indirect github.com/DataDog/appsec-internal-go v1.7.0 // indirect diff --git a/pkg/remoteconfig/experiment.go b/pkg/remoteconfig/experiment.go index 77e001337..321ed6e4f 100644 --- a/pkg/remoteconfig/experiment.go +++ b/pkg/remoteconfig/experiment.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" + "dario.cat/mergo" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" kubeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -108,38 +109,48 @@ func (r *RemoteConfigUpdater) handleStartExperiment(ctx context.Context, dda *v2 return fmt.Errorf("cannot start experiment: no currentRevision set (DDA not yet reconciled)") } - // Reject if any experiment lifecycle is active — FA must wait for the - // current experiment to be fully resolved (promoted, rolled back, timed out, - // or cleared) before starting a new one. This covers: - // - running: experiment in progress - // - rollback: stop received, restore pending - // - timeout: auto-rollback pending or just completed - // - aborted: awaiting FA acknowledgment - // - promoted: being cleared by reconciler - // - // The only exception is retrying the same experiment after a partial start - // failure (status updated but spec patch failed): ExpectedSpecHash is still - // set and the experiment ID matches. + // Guard against starting an experiment while one is actively in progress. + // Terminal phases (aborted, timeout, promoted) are safe to overwrite — + // these experiments are done and a new start should clear them. + // Active phases (running, rollback) must be resolved first. if dda.Status.Experiment != nil { exp := dda.Status.Experiment - // Allow retry of the same experiment if spec wasn't applied yet - if exp.Phase == v2alpha1.ExperimentPhaseRunning && - exp.ExpectedSpecHash != "" && - signal.ExperimentID == exp.ID { - r.logger.Info("Retrying startExperiment (prior attempt may have failed during spec patch)", - "id", signal.ExperimentID) - } else { + switch exp.Phase { + case v2alpha1.ExperimentPhaseAborted, v2alpha1.ExperimentPhaseTimeout, v2alpha1.ExperimentPhasePromoted: + // Terminal — safe to start a new experiment, will overwrite + r.logger.Info("Starting new experiment, replacing terminal experiment", + "oldID", exp.ID, "oldPhase", exp.Phase, "newID", signal.ExperimentID) + + case v2alpha1.ExperimentPhaseRunning: + // Allow retry of the same experiment if spec wasn't applied yet + if exp.ExpectedSpecHash != "" && signal.ExperimentID == exp.ID { + r.logger.Info("Retrying startExperiment (prior attempt may have failed during spec patch)", + "id", signal.ExperimentID) + } else { + return fmt.Errorf("cannot start experiment %s: experiment %s is active (phase=%s)", + signal.ExperimentID, exp.ID, exp.Phase) + } + + case v2alpha1.ExperimentPhaseRollback: + // Rollback in progress — must complete before starting new experiment return fmt.Errorf("cannot start experiment %s: experiment %s is active (phase=%s)", signal.ExperimentID, exp.ID, exp.Phase) } } - // Compute hash of the FA payload AFTER applying defaults, so it matches - // the canonical form the reconciler uses (defaults.DefaultDatadogAgentSpec - // runs before HandleExperimentLifecycle). - configCopy := signal.Config.DeepCopy() - defaults.DefaultDatadogAgentSpec(configCopy) - specHash, err := experiment.ComputeSpecHash(configCopy) + // Merge the FA patch into the current spec. FA sends only the fields it + // wants to change; everything else is preserved from the current DDA spec. + mergedSpec := dda.Spec.DeepCopy() + if err := mergo.Merge(mergedSpec, signal.Config, mergo.WithOverride); err != nil { + return fmt.Errorf("failed to merge experiment config into current spec: %w", err) + } + + // Compute hash of the MERGED+DEFAULTED spec, so it matches the canonical + // form the reconciler uses (defaults.DefaultDatadogAgentSpec runs before + // HandleExperimentLifecycle). + defaultedSpec := mergedSpec.DeepCopy() + defaults.DefaultDatadogAgentSpec(defaultedSpec) + specHash, err := experiment.ComputeSpecHash(defaultedSpec) if err != nil { return fmt.Errorf("failed to compute spec hash for experiment config: %w", err) } @@ -160,10 +171,10 @@ func (r *RemoteConfigUpdater) handleStartExperiment(ctx context.Context, dda *v2 return fmt.Errorf("failed to re-fetch DDA after status update: %w", err) } - // Step 3: Patch DDA spec with experiment config - refreshed.Spec = *signal.Config + // Step 3: Apply merged spec to DDA + refreshed.Spec = *mergedSpec if err := r.kubeClient.Update(ctx, refreshed); err != nil { - return fmt.Errorf("failed to patch DDA spec for startExperiment: %w", err) + return fmt.Errorf("failed to update DDA spec for startExperiment: %w", err) } r.logger.Info("Started experiment", "id", signal.ExperimentID) diff --git a/pkg/remoteconfig/experiment_test.go b/pkg/remoteconfig/experiment_test.go index 3a0c9c718..13e86b27d 100644 --- a/pkg/remoteconfig/experiment_test.go +++ b/pkg/remoteconfig/experiment_test.go @@ -49,6 +49,9 @@ func newTestDDA() *v2alpha1.DatadogAgent { APM: &v2alpha1.APMFeatureConfig{ Enabled: apiutils.NewBoolPointer(false), }, + NPM: &v2alpha1.NPMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, }, }, } @@ -222,9 +225,11 @@ func TestHandleStartExperiment_Success(t *testing.T) { assert.NotEmpty(t, updated.Status.Experiment.ExpectedSpecHash, "ExpectedSpecHash should be set for first-reconcile validation") - // Verify spec was patched + // Verify spec was merged (not replaced) assert.True(t, apiutils.BoolValue(updated.Spec.Features.APM.Enabled), "APM should be enabled after startExperiment") + assert.True(t, apiutils.BoolValue(updated.Spec.Features.NPM.Enabled), + "NPM should be preserved from original spec (merge, not replace)") } func TestHandleStartExperiment_MissingConfig(t *testing.T) { @@ -398,6 +403,33 @@ func TestHandleStartExperiment_DuringRollback(t *testing.T) { assert.Equal(t, "exp-001", updated.Status.Experiment.ID) } +func TestHandleStartExperiment_AfterAborted(t *testing.T) { + // A prior experiment was aborted (external edit). A new start should + // clear the aborted state and start fresh — not get stuck forever. + dda := newTestDDAWithExperiment(v2alpha1.ExperimentPhaseAborted) + r := newUpdater(dda) + + signal := &ExperimentSignal{ + Action: ExperimentActionStart, + ExperimentID: "exp-002", + Config: &v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + }, + } + + err := r.handleExperimentSignal(context.TODO(), signal) + require.NoError(t, err, "should allow new experiment after aborted") + + updated := getDDA(t, r) + require.NotNil(t, updated.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) + assert.Equal(t, "exp-002", updated.Status.Experiment.ID) +} + // ============================================================================ // handleStopExperiment tests // ============================================================================ From 782e64e4232243357f8e0938d2320dc26f0c543f Mon Sep 17 00:00:00 2001 From: Minyi Zhu Date: Tue, 17 Mar 2026 14:47:57 +0100 Subject: [PATCH 3/4] fix for stale stop/promote signal scenario --- pkg/remoteconfig/experiment.go | 22 +++++++++++++--------- pkg/remoteconfig/experiment_test.go | 18 ++++++------------ 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/pkg/remoteconfig/experiment.go b/pkg/remoteconfig/experiment.go index 321ed6e4f..6ae96548b 100644 --- a/pkg/remoteconfig/experiment.go +++ b/pkg/remoteconfig/experiment.go @@ -184,8 +184,10 @@ func (r *RemoteConfigUpdater) handleStartExperiment(ctx context.Context, dda *v2 // handleStopExperiment sets experiment status to rollback. // The reconciler will detect this and restore from baselineRevision. func (r *RemoteConfigUpdater) handleStopExperiment(ctx context.Context, dda *v2alpha1.DatadogAgent, signal *ExperimentSignal) error { - if err := r.validateExperimentSignal(dda, signal, "stopExperiment"); err != nil { + if valid, err := r.validateExperimentSignal(dda, signal, "stopExperiment"); err != nil { return err + } else if !valid { + return nil } if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ @@ -203,8 +205,10 @@ func (r *RemoteConfigUpdater) handleStopExperiment(ctx context.Context, dda *v2a // handlePromoteExperiment sets experiment status to promoted. // The reconciler will detect this and clear experiment state. func (r *RemoteConfigUpdater) handlePromoteExperiment(ctx context.Context, dda *v2alpha1.DatadogAgent, signal *ExperimentSignal) error { - if err := r.validateExperimentSignal(dda, signal, "promoteExperiment"); err != nil { + if valid, err := r.validateExperimentSignal(dda, signal, "promoteExperiment"); err != nil { return err + } else if !valid { + return nil } if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ @@ -220,23 +224,23 @@ func (r *RemoteConfigUpdater) handlePromoteExperiment(ctx context.Context, dda * } // validateExperimentSignal checks that a stop/promote signal targets the -// currently running experiment. Returns nil if valid, a logged-and-ignored -// nil-error for mismatches that should be silently dropped. -func (r *RemoteConfigUpdater) validateExperimentSignal(dda *v2alpha1.DatadogAgent, signal *ExperimentSignal, action string) error { +// currently running experiment. Returns (true, nil) if valid. Returns +// (false, nil) for stale/mismatched signals — these are logged and silently +// dropped so they don't block processing of other RC updates. +func (r *RemoteConfigUpdater) validateExperimentSignal(dda *v2alpha1.DatadogAgent, signal *ExperimentSignal, action string) (bool, error) { if dda.Status.Experiment == nil || dda.Status.Experiment.Phase != v2alpha1.ExperimentPhaseRunning { r.logger.Info(fmt.Sprintf("Ignoring %s: no running experiment", action), "currentPhase", experimentPhase(dda)) - return fmt.Errorf("ignoring %s: no running experiment", action) + return false, nil } if signal.ExperimentID != "" && dda.Status.Experiment.ID != "" && signal.ExperimentID != dda.Status.Experiment.ID { r.logger.Info(fmt.Sprintf("Ignoring %s: experiment ID mismatch", action), "signalID", signal.ExperimentID, "runningID", dda.Status.Experiment.ID) - return fmt.Errorf("ignoring %s: signal targets experiment %s but %s is running", - action, signal.ExperimentID, dda.Status.Experiment.ID) + return false, nil } - return nil + return true, nil } // setExperimentStatus updates the DDA status with the given experiment state. diff --git a/pkg/remoteconfig/experiment_test.go b/pkg/remoteconfig/experiment_test.go index 13e86b27d..6b87601ef 100644 --- a/pkg/remoteconfig/experiment_test.go +++ b/pkg/remoteconfig/experiment_test.go @@ -463,8 +463,7 @@ func TestHandleStopExperiment_NoRunningExperiment(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "no running experiment") + require.NoError(t, err, "stale signal should be silently ignored") } func TestHandleStopExperiment_AbortedExperiment(t *testing.T) { @@ -477,8 +476,7 @@ func TestHandleStopExperiment_AbortedExperiment(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "no running experiment") + require.NoError(t, err, "stale signal should be silently ignored") } func TestHandleStopExperiment_IDMismatch(t *testing.T) { @@ -491,8 +489,7 @@ func TestHandleStopExperiment_IDMismatch(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "signal targets experiment") + require.NoError(t, err, "mismatched signal should be silently ignored") // Original experiment should be untouched updated := getDDA(t, r) @@ -532,8 +529,7 @@ func TestHandlePromoteExperiment_NoRunningExperiment(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "no running experiment") + require.NoError(t, err, "stale signal should be silently ignored") } func TestHandlePromoteExperiment_AbortedExperiment(t *testing.T) { @@ -546,8 +542,7 @@ func TestHandlePromoteExperiment_AbortedExperiment(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "no running experiment") + require.NoError(t, err, "stale signal should be silently ignored") } func TestHandlePromoteExperiment_IDMismatch(t *testing.T) { @@ -560,8 +555,7 @@ func TestHandlePromoteExperiment_IDMismatch(t *testing.T) { } err := r.handleExperimentSignal(context.TODO(), signal) - require.Error(t, err) - assert.Contains(t, err.Error(), "signal targets experiment") + require.NoError(t, err, "mismatched signal should be silently ignored") // Original experiment should still be running updated := getDDA(t, r) From 72fd331614e708b700628b53b82132fcbaa6f673 Mon Sep 17 00:00:00 2001 From: Minyi Zhu Date: Tue, 17 Mar 2026 15:03:22 +0100 Subject: [PATCH 4/4] Experiment ID is enforced --- api/datadoghq/v2alpha1/datadogagent_types.go | 1 + pkg/remoteconfig/experiment.go | 111 ++++++++++++------- 2 files changed, 72 insertions(+), 40 deletions(-) diff --git a/api/datadoghq/v2alpha1/datadogagent_types.go b/api/datadoghq/v2alpha1/datadogagent_types.go index 5c7f64bc0..0f687f617 100644 --- a/api/datadoghq/v2alpha1/datadogagent_types.go +++ b/api/datadoghq/v2alpha1/datadogagent_types.go @@ -2351,6 +2351,7 @@ type ExperimentStatus struct { // +optional BaselineRevision string `json:"baselineRevision,omitempty"` // ID is the unique experiment ID sent by Fleet Automation. + // Optional in the CRD schema, but required by the RC signal handler. // +optional ID string `json:"id,omitempty"` // ExpectedSpecHash is the truncated MD5 hash of the spec that FA sent in diff --git a/pkg/remoteconfig/experiment.go b/pkg/remoteconfig/experiment.go index 6ae96548b..bc160b34e 100644 --- a/pkg/remoteconfig/experiment.go +++ b/pkg/remoteconfig/experiment.go @@ -84,6 +84,10 @@ func (r *RemoteConfigUpdater) handleExperimentSignal(ctx context.Context, signal } dda := ddaList.Items[0] + if signal.ExperimentID == "" { + return fmt.Errorf("experiment signal missing experiment_id") + } + switch signal.Action { case ExperimentActionStart: return r.handleStartExperiment(ctx, &dda, signal) @@ -138,41 +142,54 @@ func (r *RemoteConfigUpdater) handleStartExperiment(ctx context.Context, dda *v2 } } - // Merge the FA patch into the current spec. FA sends only the fields it - // wants to change; everything else is preserved from the current DDA spec. - mergedSpec := dda.Spec.DeepCopy() - if err := mergo.Merge(mergedSpec, signal.Config, mergo.WithOverride); err != nil { - return fmt.Errorf("failed to merge experiment config into current spec: %w", err) - } - - // Compute hash of the MERGED+DEFAULTED spec, so it matches the canonical - // form the reconciler uses (defaults.DefaultDatadogAgentSpec runs before - // HandleExperimentLifecycle). - defaultedSpec := mergedSpec.DeepCopy() - defaults.DefaultDatadogAgentSpec(defaultedSpec) - specHash, err := experiment.ComputeSpecHash(defaultedSpec) - if err != nil { - return fmt.Errorf("failed to compute spec hash for experiment config: %w", err) - } - - // Step 1: Update status — set phase=running, lock baseline, record expected hash + // Step 1: Update status — set phase=running, lock baseline. + // ExpectedSpecHash is computed later from the refreshed spec to avoid races. if err := r.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ Phase: v2alpha1.ExperimentPhaseRunning, BaselineRevision: dda.Status.CurrentRevision, ID: signal.ExperimentID, - ExpectedSpecHash: specHash, }); err != nil { return fmt.Errorf("failed to set experiment status for start: %w", err) } - // Step 2: Re-fetch DDA to get updated resourceVersion after status update + // Step 2: Re-fetch DDA to get the latest spec and resourceVersion. + // This ensures we merge into the most recent spec, not a stale copy + // that could silently overwrite concurrent edits. refreshed := &v2alpha1.DatadogAgent{} if err := r.kubeClient.Get(ctx, kubeclient.ObjectKeyFromObject(dda), refreshed); err != nil { return fmt.Errorf("failed to re-fetch DDA after status update: %w", err) } - // Step 3: Apply merged spec to DDA - refreshed.Spec = *mergedSpec + // Step 3: Merge FA patch into the refreshed spec. + if err := mergo.Merge(&refreshed.Spec, signal.Config, mergo.WithOverride); err != nil { + return fmt.Errorf("failed to merge experiment config into current spec: %w", err) + } + + // Step 4: Compute ExpectedSpecHash from the merged+defaulted spec and + // update the experiment status with it. + defaultedSpec := refreshed.Spec.DeepCopy() + defaults.DefaultDatadogAgentSpec(defaultedSpec) + specHash, err := experiment.ComputeSpecHash(defaultedSpec) + if err != nil { + return fmt.Errorf("failed to compute spec hash for experiment config: %w", err) + } + if err := r.statusUpdateWithRetry(ctx, refreshed, func(d *v2alpha1.DatadogAgent) { + if d.Status.Experiment != nil { + d.Status.Experiment.ExpectedSpecHash = specHash + } + }); err != nil { + return fmt.Errorf("failed to update ExpectedSpecHash: %w", err) + } + + // Step 5: Re-fetch again since status update bumped resourceVersion, + // then apply the merged spec. + if err := r.kubeClient.Get(ctx, kubeclient.ObjectKeyFromObject(dda), refreshed); err != nil { + return fmt.Errorf("failed to re-fetch DDA after hash update: %w", err) + } + // Re-merge into the latest spec (in case it changed during hash update) + if err := mergo.Merge(&refreshed.Spec, signal.Config, mergo.WithOverride); err != nil { + return fmt.Errorf("failed to re-merge experiment config: %w", err) + } if err := r.kubeClient.Update(ctx, refreshed); err != nil { return fmt.Errorf("failed to update DDA spec for startExperiment: %w", err) } @@ -243,28 +260,42 @@ func (r *RemoteConfigUpdater) validateExperimentSignal(dda *v2alpha1.DatadogAgen return true, nil } -// setExperimentStatus updates the DDA status with the given experiment state. -func (r *RemoteConfigUpdater) setExperimentStatus(ctx context.Context, dda *v2alpha1.DatadogAgent, experiment *v2alpha1.ExperimentStatus) error { - newStatus := dda.Status.DeepCopy() - newStatus.Experiment = experiment - - if apiequality.Semantic.DeepEqual(&dda.Status, newStatus) { - return nil // No change - } +// statusUpdateWithRetry applies a status mutation to the DDA and persists it. +// On conflict errors, it re-fetches the DDA and retries (up to 3 times) to +// handle concurrent status writes from the reconciler. The mutate function +// is called on each attempt with the latest DDA to apply the desired change. +func (r *RemoteConfigUpdater) statusUpdateWithRetry(ctx context.Context, dda *v2alpha1.DatadogAgent, mutate func(*v2alpha1.DatadogAgent)) error { + const maxRetries = 3 + for i := range maxRetries { + ddaUpdate := dda.DeepCopy() + mutate(ddaUpdate) + + if apiequality.Semantic.DeepEqual(&dda.Status, &ddaUpdate.Status) { + return nil // No change + } - ddaUpdate := dda.DeepCopy() - ddaUpdate.Status = *newStatus - if err := r.kubeClient.Status().Update(ctx, ddaUpdate); err != nil { - if apierrors.IsConflict(err) { - r.logger.Info("unable to update experiment status due to conflict") - return fmt.Errorf("conflict updating experiment status: %w", err) + updateErr := r.kubeClient.Status().Update(ctx, ddaUpdate) + if updateErr == nil { + dda.Status = ddaUpdate.Status + return nil + } + if !apierrors.IsConflict(updateErr) { + return updateErr + } + r.logger.Info("Status update conflict, retrying", + "attempt", i+1, "maxRetries", maxRetries) + if getErr := r.kubeClient.Get(ctx, kubeclient.ObjectKeyFromObject(dda), dda); getErr != nil { + return fmt.Errorf("failed to re-fetch DDA after status conflict: %w", getErr) } - return err } + return fmt.Errorf("failed to update status after %d retries due to conflicts", maxRetries) +} - // Update the in-memory object so subsequent operations see the new status - dda.Status = *newStatus - return nil +// setExperimentStatus updates the DDA experiment status using statusUpdateWithRetry. +func (r *RemoteConfigUpdater) setExperimentStatus(ctx context.Context, dda *v2alpha1.DatadogAgent, experimentStatus *v2alpha1.ExperimentStatus) error { + return r.statusUpdateWithRetry(ctx, dda, func(d *v2alpha1.DatadogAgent) { + d.Status.Experiment = experimentStatus + }) } // experimentPhase returns the current experiment phase string, or "none" if no experiment.