diff --git a/api/datadoghq/v2alpha1/datadogagent_types.go b/api/datadoghq/v2alpha1/datadogagent_types.go index caa0f99a9..f16a8fe02 100644 --- a/api/datadoghq/v2alpha1/datadogagent_types.go +++ b/api/datadoghq/v2alpha1/datadogagent_types.go @@ -2349,6 +2349,34 @@ type RemoteConfigConfiguration struct { Features *DatadogFeatures `json:"features,omitempty"` } +// ExperimentPhase represents the current phase of a Fleet Automation experiment. +// +kubebuilder:validation:Enum=running;stopped;rollback;timeout;promoted;aborted +type ExperimentPhase string + +const ( + ExperimentPhaseRunning ExperimentPhase = "running" + ExperimentPhaseStopped ExperimentPhase = "stopped" + ExperimentPhaseRollback ExperimentPhase = "rollback" + ExperimentPhasePromoted ExperimentPhase = "promoted" + ExperimentPhaseAborted ExperimentPhase = "aborted" + ExperimentPhaseTimeout ExperimentPhase = "timeout" +) + +// ExperimentStatus tracks the state of a Fleet Automation experiment. +// +k8s:openapi-gen=true +type ExperimentStatus struct { + // Phase is the current state of the experiment. + // +optional + Phase ExperimentPhase `json:"phase,omitempty"` + // ID is the unique experiment ID sent by Fleet Automation. + // +optional + ID string `json:"id,omitempty"` + // Generation is the DDA metadata.generation recorded when the experiment started. + // Used to detect manual spec changes during an experiment. + // +optional + Generation int64 `json:"generation,omitempty"` +} + // DatadogAgentStatus defines the observed state of DatadogAgent. // +k8s:openapi-gen=true type DatadogAgentStatus struct { @@ -2376,6 +2404,9 @@ type DatadogAgentStatus struct { // RemoteConfigConfiguration stores the configuration received from RemoteConfig. // +optional RemoteConfigConfiguration *RemoteConfigConfiguration `json:"remoteConfigConfiguration,omitempty"` + // Experiment tracks the state of an active or recent Fleet Automation experiment. + // +optional + Experiment *ExperimentStatus `json:"experiment,omitempty"` } // DatadogAgent defines Agent configuration, see reference https://github.com/DataDog/datadog-operator/blob/main/docs/configuration.v2alpha1.md diff --git a/api/datadoghq/v2alpha1/zz_generated.deepcopy.go b/api/datadoghq/v2alpha1/zz_generated.deepcopy.go index 4733c0d92..fee99ea23 100644 --- a/api/datadoghq/v2alpha1/zz_generated.deepcopy.go +++ b/api/datadoghq/v2alpha1/zz_generated.deepcopy.go @@ -1361,6 +1361,11 @@ func (in *DatadogAgentStatus) DeepCopyInto(out *DatadogAgentStatus) { *out = new(RemoteConfigConfiguration) (*in).DeepCopyInto(*out) } + if in.Experiment != nil { + in, out := &in.Experiment, &out.Experiment + *out = new(ExperimentStatus) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatadogAgentStatus. @@ -1764,6 +1769,21 @@ func (in *EventTypes) DeepCopy() *EventTypes { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExperimentStatus) DeepCopyInto(out *ExperimentStatus) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExperimentStatus. +func (in *ExperimentStatus) DeepCopy() *ExperimentStatus { + if in == nil { + return nil + } + out := new(ExperimentStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExternalMetricsServerFeatureConfig) DeepCopyInto(out *ExternalMetricsServerFeatureConfig) { *out = *in diff --git a/api/datadoghq/v2alpha1/zz_generated.openapi.go b/api/datadoghq/v2alpha1/zz_generated.openapi.go index d1672fbc0..ad30eeedb 100644 --- a/api/datadoghq/v2alpha1/zz_generated.openapi.go +++ b/api/datadoghq/v2alpha1/zz_generated.openapi.go @@ -34,6 +34,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DogstatsdFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_DogstatsdFeatureConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ErrorTrackingStandalone": schema_datadog_operator_api_datadoghq_v2alpha1_ErrorTrackingStandalone(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.EventCollectionFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_EventCollectionFeatureConfig(ref), + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus": schema_datadog_operator_api_datadoghq_v2alpha1_ExperimentStatus(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.FIPSConfig": schema_datadog_operator_api_datadoghq_v2alpha1_FIPSConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.HelmCheckFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_HelmCheckFeatureConfig(ref), "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.KubeStateMetricsCoreFeatureConfig": schema_datadog_operator_api_datadoghq_v2alpha1_KubeStateMetricsCoreFeatureConfig(ref), @@ -645,11 +646,17 @@ func schema_datadog_operator_api_datadoghq_v2alpha1_DatadogAgentStatus(ref commo Ref: ref("github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration"), }, }, + "experiment": { + SchemaProps: spec.SchemaProps{ + Description: "Experiment tracks the state of an active or recent Fleet Automation experiment.", + Ref: ref("github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DaemonSetStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DeploymentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration", "k8s.io/apimachinery/pkg/apis/meta/v1.Condition"}, + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DaemonSetStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.DeploymentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.ExperimentStatus", "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1.RemoteConfigConfiguration", "k8s.io/apimachinery/pkg/apis/meta/v1.Condition"}, } } @@ -1114,6 +1121,40 @@ func schema_datadog_operator_api_datadoghq_v2alpha1_EventCollectionFeatureConfig } } +func schema_datadog_operator_api_datadoghq_v2alpha1_ExperimentStatus(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ExperimentStatus tracks the state of a Fleet Automation experiment.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "phase": { + SchemaProps: spec.SchemaProps{ + Description: "Phase is the current state of the experiment.", + Type: []string{"string"}, + Format: "", + }, + }, + "id": { + SchemaProps: spec.SchemaProps{ + Description: "ID is the unique experiment ID sent by Fleet Automation.", + Type: []string{"string"}, + Format: "", + }, + }, + "generation": { + SchemaProps: spec.SchemaProps{ + Description: "Generation is the DDA metadata.generation recorded when the experiment started. Used to detect manual spec changes during an experiment.", + Type: []string{"integer"}, + Format: "int64", + }, + }, + }, + }, + }, + } +} + func schema_datadog_operator_api_datadoghq_v2alpha1_FIPSConfig(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/cmd/fleet-test/main.go b/cmd/fleet-test/main.go new file mode 100644 index 000000000..1b97cc769 --- /dev/null +++ b/cmd/fleet-test/main.go @@ -0,0 +1,137 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "os" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + "github.com/DataDog/datadog-operator/pkg/fleet" +) + +func main() { + var ( + action string + experimentID string + namespace string + name string + patch string + ) + + flag.StringVar(&action, "action", "", "start, stop, or promote") + flag.StringVar(&experimentID, "id", "", "experiment ID") + flag.StringVar(&namespace, "namespace", "datadog", "DDA namespace") + flag.StringVar(&name, "name", "datadog", "DDA name") + flag.StringVar(&patch, "patch", "", "JSON merge patch for start") + flag.Parse() + + if action == "" { + fmt.Fprintln(os.Stderr, "Usage: fleet-test -action start|stop|promote -id [-patch ]") + os.Exit(1) + } + + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + logger := ctrl.Log.WithName("fleet-test") + + // Build K8s client + scheme := runtime.NewScheme() + _ = v2alpha1.AddToScheme(scheme) + + cfg, err := ctrl.GetConfig() + if err != nil { + logger.Error(err, "Failed to get kubeconfig") + os.Exit(1) + } + + kubeClient, err := client.New(cfg, client.Options{Scheme: scheme}) + if err != nil { + logger.Error(err, "Failed to create K8s client") + os.Exit(1) + } + + // Build the daemon (no RC client needed for direct testing) + daemon := fleet.NewDaemonForTesting(logger, kubeClient) + + ctx := context.Background() + + switch action { + case "start": + if patch == "" { + fmt.Fprintln(os.Stderr, "Error: -patch is required for start") + os.Exit(1) + } + if experimentID == "" { + fmt.Fprintln(os.Stderr, "Error: -id is required") + os.Exit(1) + } + + // Build installer config with the patch + configID := "test-config-" + experimentID + daemon.InjectConfig(configID, namespace, name, json.RawMessage(patch)) + + fmt.Printf("Starting experiment %s on %s/%s\n", experimentID, namespace, name) + err = daemon.StartExperiment(ctx, experimentID, configID) + + case "stop": + if experimentID == "" { + fmt.Fprintln(os.Stderr, "Error: -id is required") + os.Exit(1) + } + fmt.Printf("Stopping experiment %s\n", experimentID) + err = daemon.StopExperiment(ctx, experimentID) + + case "promote": + if experimentID == "" { + fmt.Fprintln(os.Stderr, "Error: -id is required") + os.Exit(1) + } + fmt.Printf("Promoting experiment %s\n", experimentID) + err = daemon.PromoteExperiment(ctx, experimentID) + + case "status": + dda := &v2alpha1.DatadogAgent{} + if getErr := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, dda); getErr != nil { + logger.Error(getErr, "Failed to get DDA") + os.Exit(1) + } + if dda.Status.Experiment == nil { + fmt.Println("No experiment running") + } else { + data, _ := json.MarshalIndent(dda.Status.Experiment, "", " ") + fmt.Printf("Experiment:\n%s\n", string(data)) + } + fmt.Printf("APM enabled: %v\n", dda.Spec.Features != nil && dda.Spec.Features.APM != nil && dda.Spec.Features.APM.Enabled != nil && *dda.Spec.Features.APM.Enabled) + return + + default: + fmt.Fprintf(os.Stderr, "Unknown action: %s\n", action) + os.Exit(1) + } + + if err != nil { + logger.Error(err, "Failed", "action", action) + os.Exit(1) + } + fmt.Printf("Success: %s experiment %s\n", action, experimentID) + + // Print current state + dda := &v2alpha1.DatadogAgent{} + if getErr := kubeClient.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, dda); getErr == nil { + if dda.Status.Experiment != nil { + data, _ := json.MarshalIndent(dda.Status.Experiment, "", " ") + fmt.Printf("Experiment status:\n%s\n", string(data)) + } + } +} + +// Unused but needed for the fleetManagementOperation import +var _ = schema.GroupVersionKind{} diff --git a/cmd/main.go b/cmd/main.go index 8325f079f..5b9ee9d6c 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -666,6 +666,6 @@ func setupAndStartHelmMetadataForwarder(logger logr.Logger, mgr manager.Manager, } func setupFleetDaemon(logger logr.Logger, mgr manager.Manager, rcClient remoteconfig.RCClient) error { - daemon := fleet.NewDaemon(logger.WithName("fleet"), rcClient) + daemon := fleet.NewDaemon(logger.WithName("fleet"), rcClient, mgr.GetClient()) return mgr.Add(daemon) } diff --git a/config/crd/bases/v1/datadoghq.com_datadogagents.yaml b/config/crd/bases/v1/datadoghq.com_datadogagents.yaml index 032573381..882a2b394 100644 --- a/config/crd/bases/v1/datadoghq.com_datadogagents.yaml +++ b/config/crd/bases/v1/datadoghq.com_datadogagents.yaml @@ -8474,6 +8474,29 @@ spec: x-kubernetes-list-map-keys: - type x-kubernetes-list-type: map + experiment: + description: Experiment tracks the state of an active or recent Fleet Automation experiment. + properties: + generation: + description: |- + Generation is the DDA metadata.generation recorded when the experiment started. + Used to detect manual spec changes during an experiment. + format: int64 + type: integer + id: + description: ID is the unique experiment ID sent by Fleet Automation. + type: string + phase: + description: Phase is the current state of the experiment. + enum: + - running + - stopped + - rollback + - timeout + - promoted + - aborted + type: string + type: object otelAgentGateway: description: The actual state of the OTel Agent Gateway as a deployment. properties: diff --git a/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json b/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json index a1e193416..282f9e1dd 100644 --- a/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json +++ b/config/crd/bases/v1/datadoghq.com_datadogagents_v2alpha1.json @@ -8175,6 +8175,34 @@ ], "x-kubernetes-list-type": "map" }, + "experiment": { + "additionalProperties": false, + "description": "Experiment tracks the state of an active or recent Fleet Automation experiment.", + "properties": { + "generation": { + "description": "Generation is the DDA metadata.generation recorded when the experiment started.\nUsed to detect manual spec changes during an experiment.", + "format": "int64", + "type": "integer" + }, + "id": { + "description": "ID is the unique experiment ID sent by Fleet Automation.", + "type": "string" + }, + "phase": { + "description": "Phase is the current state of the experiment.", + "enum": [ + "running", + "stopped", + "rollback", + "timeout", + "promoted", + "aborted" + ], + "type": "string" + } + }, + "type": "object" + }, "otelAgentGateway": { "additionalProperties": false, "description": "The actual state of the OTel Agent Gateway as a deployment.", diff --git a/internal/controller/datadogagent/controller_reconcile_v2_common.go b/internal/controller/datadogagent/controller_reconcile_v2_common.go index ef70322ea..89deeee53 100644 --- a/internal/controller/datadogagent/controller_reconcile_v2_common.go +++ b/internal/controller/datadogagent/controller_reconcile_v2_common.go @@ -826,5 +826,9 @@ func IsEqualStatus(current *v2alpha1.DatadogAgentStatus, newStatus *v2alpha1.Dat return false } + if !apiequality.Semantic.DeepEqual(current.Experiment, newStatus.Experiment) { + return false + } + return condition.IsEqualConditions(current.Conditions, newStatus.Conditions) } diff --git a/internal/controller/datadogagent/controller_reconcile_v2_helpers.go b/internal/controller/datadogagent/controller_reconcile_v2_helpers.go index cdbf084c3..232b39c2f 100644 --- a/internal/controller/datadogagent/controller_reconcile_v2_helpers.go +++ b/internal/controller/datadogagent/controller_reconcile_v2_helpers.go @@ -259,6 +259,9 @@ func generateNewStatusFromDDA(ddaStatus *datadoghqv2alpha1.DatadogAgentStatus) * if ddaStatus.RemoteConfigConfiguration != nil { status.RemoteConfigConfiguration = ddaStatus.RemoteConfigConfiguration } + if ddaStatus.Experiment != nil { + status.Experiment = ddaStatus.Experiment.DeepCopy() + } } return status } diff --git a/pkg/fleet/daemon.go b/pkg/fleet/daemon.go index 706e4aeb1..5bb3cace6 100644 --- a/pkg/fleet/daemon.go +++ b/pkg/fleet/daemon.go @@ -12,6 +12,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/remoteconfig/state" "github.com/go-logr/logr" + kubeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/DataDog/datadog-operator/pkg/remoteconfig" @@ -29,17 +30,19 @@ var _ manager.LeaderElectionRunnable = &Daemon{} // Daemon subscribes to fleet-specific RC products (installer configs and tasks) // and runs after leader election as a controller-runtime Runnable. type Daemon struct { - logger logr.Logger - rcClient remoteconfig.RCClient + logger logr.Logger + rcClient remoteconfig.RCClient + kubeClient kubeclient.Client mu sync.RWMutex configs map[string]installerConfig // keyed by config ID; replaced on each RC update } // NewDaemon creates a new Fleet Daemon. -func NewDaemon(logger logr.Logger, rcClient remoteconfig.RCClient) *Daemon { +func NewDaemon(logger logr.Logger, rcClient remoteconfig.RCClient, kubeClient kubeclient.Client) *Daemon { return &Daemon{ - logger: logger, - rcClient: rcClient, + logger: logger, + rcClient: rcClient, + kubeClient: kubeClient, configs: make(map[string]installerConfig), } } @@ -103,25 +106,5 @@ func (d *Daemon) handleRemoteAPIRequest(req remoteAPIRequest) error { } } -func (d *Daemon) startDatadogAgentExperiment(req remoteAPIRequest) error { - cfg, err := d.getConfig(req.ExpectedState.ExperimentConfig) - if err != nil { - return fmt.Errorf("start DatadogAgent experiment: %w", err) - } - d.logger.Info("Starting DatadogAgent experiment", "id", req.ID, "package", req.Package, "config_id", cfg.ID) - return nil -} -func (d *Daemon) stopDatadogAgentExperiment(req remoteAPIRequest) error { - d.logger.Info("Stopping DatadogAgent experiment", "id", req.ID, "package", req.Package) - return nil -} -func (d *Daemon) promoteDatadogAgentExperiment(req remoteAPIRequest) error { - cfg, err := d.getConfig(req.ExpectedState.ExperimentConfig) - if err != nil { - return fmt.Errorf("promote DatadogAgent experiment: %w", err) - } - d.logger.Info("Promoting DatadogAgent experiment", "id", req.ID, "package", req.Package, "config_id", cfg.ID) - return nil -} diff --git a/pkg/fleet/experiment.go b/pkg/fleet/experiment.go new file mode 100644 index 000000000..18358678a --- /dev/null +++ b/pkg/fleet/experiment.go @@ -0,0 +1,219 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package fleet + +import ( + "context" + "encoding/json" + "fmt" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kubeclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" +) + +// fleetManagementOperation is a single fleet operation for config management. +type fleetManagementOperation struct { + Operation operation `json:"operation"` + GroupVersionKind schema.GroupVersionKind `json:"group_version_kind"` + NamespacedName types.NamespacedName `json:"namespaced_name"` + Config json.RawMessage `json:"config"` +} + +type operation string + +const ( + operationCreate operation = "create" + operationUpdate operation = "update" + operationDelete operation = "delete" +) + +// extractDDAPatch extracts the first update operation targeting a DatadogAgent +// from the installer config. Returns the target NamespacedName and the raw +// JSON merge patch, or an error if no matching operation is found. +func extractDDAPatch(cfg installerConfig) (types.NamespacedName, json.RawMessage, error) { + for _, op := range cfg.Operations { + if op.Operation == operationUpdate && op.GroupVersionKind.Kind == "DatadogAgent" { + return op.NamespacedName, op.Config, nil + } + } + return types.NamespacedName{}, nil, fmt.Errorf("no DatadogAgent update operation found in config %s", cfg.ID) +} + +func (d *Daemon) startDatadogAgentExperiment(req remoteAPIRequest) error { + ctx := context.TODO() + + if req.ID == "" { + return fmt.Errorf("experiment task missing ID") + } + + cfg, err := d.getConfig(req.ExpectedState.ExperimentConfig) + if err != nil { + return fmt.Errorf("start experiment: %w", err) + } + + target, patch, err := extractDDAPatch(cfg) + if err != nil { + return fmt.Errorf("start experiment: %w", err) + } + + // Get the target DDA + dda := &v2alpha1.DatadogAgent{} + if err := d.kubeClient.Get(ctx, target, dda); err != nil { + return fmt.Errorf("start experiment: failed to get DDA %s: %w", target, err) + } + + // Guard: reject if an active experiment is in progress + if dda.Status.Experiment != nil { + phase := dda.Status.Experiment.Phase + switch phase { + case v2alpha1.ExperimentPhaseRunning, v2alpha1.ExperimentPhaseRollback, v2alpha1.ExperimentPhaseStopped: + return fmt.Errorf("start experiment: experiment %s is active (phase=%s)", dda.Status.Experiment.ID, phase) + } + } + + // Step 1: Apply JSON merge patch to build the target spec + patchedDDA := dda.DeepCopy() + if err := json.Unmarshal(patch, patchedDDA); err != nil { + return fmt.Errorf("start experiment: failed to apply merge patch: %w", err) + } + + // Step 2: Set experiment status + if err := d.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseRunning, + ID: req.ID, + }); err != nil { + return fmt.Errorf("start experiment: failed to set status: %w", err) + } + + // Step 3: Re-fetch for fresh resourceVersion, apply patched spec + refreshed := &v2alpha1.DatadogAgent{} + if err := d.kubeClient.Get(ctx, target, refreshed); err != nil { + return fmt.Errorf("start experiment: failed to re-fetch DDA: %w", err) + } + refreshed.Spec = patchedDDA.Spec + if patchedDDA.Annotations != nil { + refreshed.Annotations = patchedDDA.Annotations + } + if err := d.kubeClient.Update(ctx, refreshed); err != nil { + return fmt.Errorf("start experiment: failed to update DDA spec: %w", err) + } + + // Step 4: Update experiment generation to post-patch value + if err := d.kubeClient.Get(ctx, target, refreshed); err != nil { + return fmt.Errorf("start experiment: failed to re-fetch for generation: %w", err) + } + if refreshed.Status.Experiment != nil { + refreshed.Status.Experiment.Generation = refreshed.Generation + if err := d.kubeClient.Status().Update(ctx, refreshed); err != nil { + if !apierrors.IsConflict(err) { + return fmt.Errorf("start experiment: failed to update generation: %w", err) + } + d.logger.Info("Generation update conflicted, will be set on next reconcile") + } + } + + d.logger.Info("Started experiment", "id", req.ID, "target", target) + return nil +} + +func (d *Daemon) stopDatadogAgentExperiment(req remoteAPIRequest) error { + ctx := context.TODO() + + dda, err := d.findRunningExperiment(ctx, req.ID) + if err != nil { + return err + } + if dda == nil { + d.logger.Info("Ignoring stop: no matching running experiment", "taskID", req.ID) + return nil + } + + if err := d.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhaseStopped, + ID: dda.Status.Experiment.ID, + Generation: dda.Status.Experiment.Generation, + }); err != nil { + return fmt.Errorf("stop experiment: %w", err) + } + + d.logger.Info("Stopped experiment", "id", req.ID) + return nil +} + +func (d *Daemon) promoteDatadogAgentExperiment(req remoteAPIRequest) error { + ctx := context.TODO() + + dda, err := d.findRunningExperiment(ctx, req.ID) + if err != nil { + return err + } + if dda == nil { + d.logger.Info("Ignoring promote: no matching running experiment", "taskID", req.ID) + return nil + } + + if err := d.setExperimentStatus(ctx, dda, &v2alpha1.ExperimentStatus{ + Phase: v2alpha1.ExperimentPhasePromoted, + ID: dda.Status.Experiment.ID, + }); err != nil { + return fmt.Errorf("promote experiment: %w", err) + } + + d.logger.Info("Promoted experiment", "id", req.ID) + return nil +} + +func (d *Daemon) findRunningExperiment(ctx context.Context, taskID string) (*v2alpha1.DatadogAgent, error) { + ddaList := &v2alpha1.DatadogAgentList{} + if err := d.kubeClient.List(ctx, ddaList); err != nil { + return nil, fmt.Errorf("failed to list DDAs: %w", err) + } + if len(ddaList.Items) == 0 { + return nil, nil + } + + dda := &ddaList.Items[0] + if dda.Status.Experiment == nil || dda.Status.Experiment.Phase != v2alpha1.ExperimentPhaseRunning { + return nil, nil + } + if dda.Status.Experiment.ID != "" && taskID != "" && dda.Status.Experiment.ID != taskID { + return nil, nil + } + return dda, nil +} + +func (d *Daemon) setExperimentStatus(ctx context.Context, dda *v2alpha1.DatadogAgent, experiment *v2alpha1.ExperimentStatus) error { + const maxRetries = 3 + for i := range maxRetries { + newStatus := dda.Status.DeepCopy() + newStatus.Experiment = experiment + + if apiequality.Semantic.DeepEqual(&dda.Status, newStatus) { + return nil + } + + ddaUpdate := dda.DeepCopy() + ddaUpdate.Status = *newStatus + updateErr := d.kubeClient.Status().Update(ctx, ddaUpdate) + if updateErr == nil { + dda.Status = *newStatus + return nil + } + if !apierrors.IsConflict(updateErr) { + return updateErr + } + d.logger.Info("Status update conflict, retrying", "attempt", i+1) + if getErr := d.kubeClient.Get(ctx, kubeclient.ObjectKeyFromObject(dda), dda); getErr != nil { + return fmt.Errorf("re-fetch after conflict: %w", getErr) + } + } + return fmt.Errorf("failed to update experiment status after %d retries", maxRetries) +} diff --git a/pkg/fleet/experiment_test.go b/pkg/fleet/experiment_test.go new file mode 100644 index 000000000..7aa510176 --- /dev/null +++ b/pkg/fleet/experiment_test.go @@ -0,0 +1,216 @@ +package fleet + +import ( + "context" + "encoding/json" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + apiutils "github.com/DataDog/datadog-operator/api/utils" +) + +func init() { + logf.SetLogger(zap.New(zap.UseDevMode(true))) +} + +func testDaemon(objs ...runtime.Object) *Daemon { + s := runtime.NewScheme() + _ = v2alpha1.AddToScheme(s) + builder := fake.NewClientBuilder().WithScheme(s) + for _, obj := range objs { + builder = builder.WithRuntimeObjects(obj) + } + builder = builder.WithStatusSubresource(&v2alpha1.DatadogAgent{}) + return &Daemon{ + logger: logr.New(logf.NullLogSink{}), + kubeClient: builder.Build(), + configs: make(map[string]installerConfig), + } +} + +func baseDDA() *v2alpha1.DatadogAgent { + return &v2alpha1.DatadogAgent{ + TypeMeta: metav1.TypeMeta{ + Kind: "DatadogAgent", + APIVersion: "datadoghq.com/v2alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "datadog-agent", + Namespace: "datadog", + }, + Spec: v2alpha1.DatadogAgentSpec{ + Features: &v2alpha1.DatadogFeatures{ + APM: &v2alpha1.APMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(false), + }, + NPM: &v2alpha1.NPMFeatureConfig{ + Enabled: apiutils.NewBoolPointer(true), + }, + }, + }, + } +} + +func makeConfig(id string) installerConfig { + return installerConfig{ + ID: id, + Operations: []fleetManagementOperation{ + { + Operation: operationUpdate, + GroupVersionKind: schema.GroupVersionKind{Group: "datadoghq.com", Version: "v2alpha1", Kind: "DatadogAgent"}, + NamespacedName: types.NamespacedName{Namespace: "datadog", Name: "datadog-agent"}, + Config: json.RawMessage(`{"spec":{"features":{"apm":{"enabled":true}}}}`), + }, + }, + } +} + +func getDDAFromDaemon(t *testing.T, d *Daemon) *v2alpha1.DatadogAgent { + t.Helper() + dda := &v2alpha1.DatadogAgent{} + err := d.kubeClient.Get(context.TODO(), types.NamespacedName{Name: "datadog-agent", Namespace: "datadog"}, dda) + require.NoError(t, err) + return dda +} + +func TestExtractDDAPatch_Success(t *testing.T) { + cfg := makeConfig("cfg-1") + target, patch, err := extractDDAPatch(cfg) + require.NoError(t, err) + assert.Equal(t, "datadog", target.Namespace) + assert.Equal(t, "datadog-agent", target.Name) + assert.NotEmpty(t, patch) +} + +func TestExtractDDAPatch_NoMatch(t *testing.T) { + cfg := installerConfig{ID: "cfg-1"} + _, _, err := extractDDAPatch(cfg) + require.Error(t, err) +} + +func TestStartExperiment_Success(t *testing.T) { + dda := baseDDA() + d := testDaemon(dda) + d.configs["cfg-1"] = makeConfig("cfg-1") + + err := d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: "exp-001", + ExpectedState: expectedState{ExperimentConfig: "cfg-1"}, + }) + require.NoError(t, err) + + updated := getDDAFromDaemon(t, d) + assert.True(t, apiutils.BoolValue(updated.Spec.Features.APM.Enabled)) + assert.True(t, apiutils.BoolValue(updated.Spec.Features.NPM.Enabled), "NPM preserved") + require.NotNil(t, updated.Status.Experiment) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) + assert.Equal(t, "exp-001", updated.Status.Experiment.ID) +} + +func TestStartExperiment_MissingConfig(t *testing.T) { + d := testDaemon(baseDDA()) + err := d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: "exp-001", + ExpectedState: expectedState{ExperimentConfig: "nonexistent"}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "not found") +} + +func TestStartExperiment_MissingID(t *testing.T) { + d := testDaemon(baseDDA()) + err := d.startDatadogAgentExperiment(remoteAPIRequest{ID: ""}) + require.Error(t, err) + assert.Contains(t, err.Error(), "missing ID") +} + +func TestStartExperiment_DDANotFound(t *testing.T) { + d := testDaemon() // no DDA + d.configs["cfg-1"] = makeConfig("cfg-1") + err := d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: "exp-001", + ExpectedState: expectedState{ExperimentConfig: "cfg-1"}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to get DDA") +} + +func TestStartExperiment_AlreadyRunning(t *testing.T) { + dda := baseDDA() + dda.Status.Experiment = &v2alpha1.ExperimentStatus{Phase: v2alpha1.ExperimentPhaseRunning, ID: "exp-001"} + d := testDaemon(dda) + d.configs["cfg-1"] = makeConfig("cfg-1") + err := d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: "exp-002", + ExpectedState: expectedState{ExperimentConfig: "cfg-1"}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "active") +} + +func TestStartExperiment_AfterAborted(t *testing.T) { + dda := baseDDA() + dda.Status.Experiment = &v2alpha1.ExperimentStatus{Phase: v2alpha1.ExperimentPhaseAborted, ID: "exp-001"} + d := testDaemon(dda) + d.configs["cfg-1"] = makeConfig("cfg-1") + err := d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: "exp-002", + ExpectedState: expectedState{ExperimentConfig: "cfg-1"}, + }) + require.NoError(t, err) + updated := getDDAFromDaemon(t, d) + assert.Equal(t, "exp-002", updated.Status.Experiment.ID) +} + +func TestStopExperiment_Running(t *testing.T) { + dda := baseDDA() + dda.Status.Experiment = &v2alpha1.ExperimentStatus{Phase: v2alpha1.ExperimentPhaseRunning, ID: "exp-001", Generation: 1} + d := testDaemon(dda) + err := d.stopDatadogAgentExperiment(remoteAPIRequest{ID: "exp-001"}) + require.NoError(t, err) + updated := getDDAFromDaemon(t, d) + assert.Equal(t, v2alpha1.ExperimentPhaseStopped, updated.Status.Experiment.Phase) +} + +func TestStopExperiment_NoRunning(t *testing.T) { + d := testDaemon(baseDDA()) + err := d.stopDatadogAgentExperiment(remoteAPIRequest{ID: "exp-001"}) + require.NoError(t, err) +} + +func TestStopExperiment_IDMismatch(t *testing.T) { + dda := baseDDA() + dda.Status.Experiment = &v2alpha1.ExperimentStatus{Phase: v2alpha1.ExperimentPhaseRunning, ID: "exp-001"} + d := testDaemon(dda) + err := d.stopDatadogAgentExperiment(remoteAPIRequest{ID: "exp-999"}) + require.NoError(t, err) + updated := getDDAFromDaemon(t, d) + assert.Equal(t, v2alpha1.ExperimentPhaseRunning, updated.Status.Experiment.Phase) +} + +func TestPromoteExperiment_Running(t *testing.T) { + dda := baseDDA() + dda.Status.Experiment = &v2alpha1.ExperimentStatus{Phase: v2alpha1.ExperimentPhaseRunning, ID: "exp-001"} + d := testDaemon(dda) + err := d.promoteDatadogAgentExperiment(remoteAPIRequest{ID: "exp-001"}) + require.NoError(t, err) + updated := getDDAFromDaemon(t, d) + assert.Equal(t, v2alpha1.ExperimentPhasePromoted, updated.Status.Experiment.Phase) +} + +func TestPromoteExperiment_NoRunning(t *testing.T) { + d := testDaemon(baseDDA()) + err := d.promoteDatadogAgentExperiment(remoteAPIRequest{ID: "exp-001"}) + require.NoError(t, err) +} diff --git a/pkg/fleet/remote_config.go b/pkg/fleet/remote_config.go index 2acee81fd..f46b35de4 100644 --- a/pkg/fleet/remote_config.go +++ b/pkg/fleet/remote_config.go @@ -16,6 +16,7 @@ import ( type installerConfig struct { ID string `json:"id"` FileOperations []installerConfigFileOperation `json:"file_operations"` + Operations []fleetManagementOperation `json:"operations"` } // installerConfigFileOperation is a single file operation in an installerConfig. diff --git a/pkg/fleet/testing.go b/pkg/fleet/testing.go new file mode 100644 index 000000000..1f1590cda --- /dev/null +++ b/pkg/fleet/testing.go @@ -0,0 +1,63 @@ +package fleet + +import ( + "context" + "encoding/json" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + kubeclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +// NewDaemonForTesting creates a Daemon with a K8s client but no RC client. +// Used by the fleet-test CLI to exercise experiment functions directly. +func NewDaemonForTesting(logger logr.Logger, kubeClient kubeclient.Client) *Daemon { + return &Daemon{ + logger: logger, + kubeClient: kubeClient, + configs: make(map[string]installerConfig), + } +} + +// InjectConfig adds a test config that patches the specified DDA. +func (d *Daemon) InjectConfig(configID, namespace, name string, patch json.RawMessage) { + d.mu.Lock() + defer d.mu.Unlock() + d.configs[configID] = installerConfig{ + ID: configID, + Operations: []fleetManagementOperation{ + { + Operation: operationUpdate, + GroupVersionKind: schema.GroupVersionKind{Group: "datadoghq.com", Version: "v2alpha1", Kind: "DatadogAgent"}, + NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}, + Config: patch, + }, + }, + } +} + +// StartExperiment starts an experiment via the daemon. +func (d *Daemon) StartExperiment(ctx context.Context, experimentID, configID string) error { + return d.startDatadogAgentExperiment(remoteAPIRequest{ + ID: experimentID, + Method: methodStartDatadogAgentExperiment, + ExpectedState: expectedState{ExperimentConfig: configID}, + }) +} + +// StopExperiment stops an experiment via the daemon. +func (d *Daemon) StopExperiment(ctx context.Context, experimentID string) error { + return d.stopDatadogAgentExperiment(remoteAPIRequest{ + ID: experimentID, + Method: methodStopDatadogAgentExperiment, + }) +} + +// PromoteExperiment promotes an experiment via the daemon. +func (d *Daemon) PromoteExperiment(ctx context.Context, experimentID string) error { + return d.promoteDatadogAgentExperiment(remoteAPIRequest{ + ID: experimentID, + Method: methodPromoteDatadogAgentExperiment, + }) +}