Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cmd/agent/common/autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"slices"
"time"

"github.com/DataDog/datadog-agent/pkg/util/flavor"
"go.uber.org/atomic"
utilserror "k8s.io/apimachinery/pkg/util/errors"

Expand Down Expand Up @@ -59,6 +60,16 @@ func setupAutoDiscovery(confSearchPaths []string, wmeta workloadmeta.Component,
time.Duration(pkgconfigsetup.Datadog().GetInt("autoconf_config_files_poll_interval"))*time.Second,
)

crdCheckEnabled := pkgconfigsetup.Datadog().GetBool("workload_config.enabled")
if crdCheckEnabled && flavor.GetFlavor() != flavor.ClusterAgent {
pollInterval := time.Duration(pkgconfigsetup.Datadog().GetInt("autoconf_crd_checks_poll_interval")) * time.Second
ac.AddConfigProvider(
providers.NewCRDFileConfigProvider(pkgconfigsetup.Datadog().GetString("autoconf_crd_checks_dir"), providers.DefaultCRDNameExtractor, acTelemetryStore),
true,
pollInterval,
)
}

// Autodiscovery cannot easily use config.RegisterOverrideFunc() due to Unmarshalling
extraConfigProviders, extraConfigListeners := confad.DiscoverComponentsFromConfig()

Expand Down
1 change: 1 addition & 0 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func start(log log.Component,
DynamicInformerFactory: apiCl.DynamicInformerFactory,
Client: apiCl.InformerCl,
IsLeaderFunc: le.IsLeader,
LeaderNotifier: le.Subscribe,
EventRecorder: eventRecorder,
WorkloadMeta: wmeta,
StopCh: stopCh,
Expand Down
38 changes: 33 additions & 5 deletions comp/core/autodiscovery/listeners/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func (s *WorkloadService) FilterTemplates(configs map[string]integration.Config)
s.filterTemplatesOverriddenChecks(configs)
filterTemplatesMatched(s, configs)

// Provider precedence must run after matching so that a CRD file template
// rejected by workload filters does not suppress the file fallback.
s.filterTemplatesProviderPrecedence(configs)

// Container Collect All filtering should always be last
s.filterTemplatesContainerCollectAll(configs)
}
Expand Down Expand Up @@ -187,16 +191,16 @@ func (s *WorkloadService) filterTemplatesEmptyOverrides(configs map[string]integ
}
}

// filterTemplatesOverriddenChecks drops file-based templates if this service's
// labels/annotations specify a check of the same name.
// filterTemplatesOverriddenChecks drops file-based and CRD-file-based templates
// if this service's labels/annotations specify a check of the same name.
func (s *WorkloadService) filterTemplatesOverriddenChecks(configs map[string]integration.Config) {
for digest, config := range configs {
if config.Provider != names.File {
continue // only override file configs
if config.Provider != names.File && config.Provider != names.CRDFile {
continue // only override file and CRD file configs
}
for _, checkName := range s.checkNames {
if config.Name == checkName {
// Ignore config from file when the same check is activated on
// Ignore config from file/CRD file when the same check is activated on
// the same service via other config providers (k8s annotations
// or container labels)
log.Debugf("Ignoring config from %s: the service %s overrides check %s",
Expand All @@ -207,6 +211,30 @@ func (s *WorkloadService) filterTemplatesOverriddenChecks(configs map[string]int
}
}

// filterTemplatesProviderPrecedence enforces CRD-file-over-file precedence among
// the templates that survived workload matching. It must be called after
// filterTemplatesMatched so that a CRD file template rejected by workload
// filters does not incorrectly suppress the corresponding file template.
func (s *WorkloadService) filterTemplatesProviderPrecedence(configs map[string]integration.Config) {
crdFileNames := map[string]struct{}{}
for _, config := range configs {
if config.Provider == names.CRDFile {
crdFileNames[config.Name] = struct{}{}
}
}

for digest, config := range configs {
if config.Provider != names.File {
continue
}
if _, hasCRD := crdFileNames[config.Name]; hasCRD {
log.Debugf("Ignoring file config from %s: CRD file provider overrides check %s for service %s",
config.Source, config.Name, s.GetServiceID())
delete(configs, digest)
}
}
}

// filterTemplatesContainerCollectAll drops the container-collect-all template
// added by the config provider (AddContainerCollectAllConfigs) if the service
// has any other templates containing logs config.
Expand Down
56 changes: 56 additions & 0 deletions comp/core/autodiscovery/listeners/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func TestServiceFilterTemplatesOverriddenChecks(t *testing.T) {
entity := &workloadmeta.Container{EntityID: workloadmeta.EntityID{Kind: "container", ID: "testy"}}
fooTpl := integration.Config{Name: "foo", Provider: names.File, LogsConfig: []byte(`{"source":"foo"}`)}
barTpl := integration.Config{Name: "bar", Provider: names.File, LogsConfig: []byte(`{"source":"bar"}`)}
fooCRDTpl := integration.Config{Name: "foo", Provider: names.CRDFile, Instances: []integration.Data{[]byte(`{"crd":"foo"}`)}}
barCRDTpl := integration.Config{Name: "bar", Provider: names.CRDFile, Instances: []integration.Data{[]byte(`{"crd":"bar"}`)}}
fooNonFileTpl := integration.Config{Name: "foo", Provider: "xxx", LogsConfig: []byte(`{"source":"foo-nf"}`)}
barNonFileTpl := integration.Config{Name: "bar", Provider: "xxx", LogsConfig: []byte(`{"source":"bar-nf"}`)}
nothingDropped := []integration.Config{}
Expand All @@ -105,6 +107,60 @@ func TestServiceFilterTemplatesOverriddenChecks(t *testing.T) {
assert.Equal(t, []integration.Config{barTpl},
filterDrops(&WorkloadService{entity: entity, checkNames: []string{"bing", "bar"}}, fooTpl, barTpl, fooNonFileTpl, barNonFileTpl))
})

t.Run("CRD file template dropped by annotation checkName", func(t *testing.T) {
assert.Equal(t, []integration.Config{fooCRDTpl},
filterDrops(&WorkloadService{entity: entity, checkNames: []string{"foo"}}, fooCRDTpl, barTpl))
})

t.Run("both file and CRD file templates dropped by annotation checkNames", func(t *testing.T) {
assert.Equal(t, []integration.Config{fooTpl, fooCRDTpl, barCRDTpl},
filterDrops(&WorkloadService{entity: entity, checkNames: []string{"foo", "bar"}}, fooTpl, fooCRDTpl, barCRDTpl, fooNonFileTpl))
})

t.Run("non-file non-CRD template not dropped", func(t *testing.T) {
assert.Equal(t, nothingDropped,
filterDrops(&WorkloadService{entity: entity, checkNames: []string{"foo"}}, fooNonFileTpl))
})
}

func TestServiceFilterTemplatesProviderPrecedence(t *testing.T) {
filterDrops := func(svc *WorkloadService, configs ...integration.Config) []integration.Config {
return filterConfigsDropped(svc.filterTemplatesProviderPrecedence, configs...)
}

entity := &workloadmeta.Container{EntityID: workloadmeta.EntityID{Kind: "container", ID: "testy"}}
fooFile := integration.Config{Name: "foo", Provider: names.File, LogsConfig: []byte(`{"a":"file"}`)}
fooCRD := integration.Config{Name: "foo", Provider: names.CRDFile, Instances: []integration.Data{[]byte(`{}`)}}
barFile := integration.Config{Name: "bar", Provider: names.File, LogsConfig: []byte(`{"a":"bar"}`)}
nothingDropped := []integration.Config{}

svc := &WorkloadService{entity: entity}

t.Run("file only: nothing dropped", func(t *testing.T) {
assert.Equal(t, nothingDropped,
filterDrops(svc, fooFile))
})

t.Run("CRD only: nothing dropped", func(t *testing.T) {
assert.Equal(t, nothingDropped,
filterDrops(svc, fooCRD))
})

t.Run("CRD present: file dropped", func(t *testing.T) {
assert.Equal(t, []integration.Config{fooFile},
filterDrops(svc, fooFile, fooCRD))
})

t.Run("CRD for different name: unrelated file not dropped", func(t *testing.T) {
assert.Equal(t, nothingDropped,
filterDrops(svc, barFile, fooCRD))
})

t.Run("CRD for foo, file for bar: only foo file dropped", func(t *testing.T) {
assert.Equal(t, []integration.Config{fooFile},
filterDrops(svc, fooFile, fooCRD, barFile))
})
}

func TestServiceFilterTemplatesCCA(t *testing.T) {
Expand Down
134 changes: 134 additions & 0 deletions comp/core/autodiscovery/providers/crd_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package providers

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/types"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// NameExtractorFunc extracts an integration check name from a config filename (without extension).
// It returns the check name and an error if the filename does not conform to the expected convention.
type NameExtractorFunc func(filenameWithoutExt string) (checkName string, err error)

// DefaultCRDNameExtractor returns the portion of the filename after the last '_'.
// e.g. "mynamespace_mypod_redis" → "redis"
func DefaultCRDNameExtractor(filenameWithoutExt string) (string, error) {
idx := strings.LastIndex(filenameWithoutExt, "_")
if idx < 0 || idx == len(filenameWithoutExt)-1 {
return "", fmt.Errorf("filename %q does not match expected convention <NAMESPACE_NAME_CHECKNAME>", filenameWithoutExt)
}
return filenameWithoutExt[idx+1:], nil
}

// CRDFileConfigProvider collects check configurations from a directory populated
// by an external CRD controller and mounted into the agent container via a
// Kubernetes ConfigMap.
type CRDFileConfigProvider struct {
dir string
nameExtractor NameExtractorFunc
Errors map[string]string
telemetryStore *telemetry.Store
}

// NewCRDFileConfigProvider creates a new CRDFileConfigProvider.
func NewCRDFileConfigProvider(dir string, extractor NameExtractorFunc, telemetryStore *telemetry.Store) *CRDFileConfigProvider {
return &CRDFileConfigProvider{
dir: dir,
nameExtractor: extractor,
Errors: make(map[string]string),
telemetryStore: telemetryStore,
}
}

// Collect returns the check configurations found in the CRD config directory.
// Configs with advanced AD identifiers (kube_services, kube_endpoints CEL selectors) are filtered out.
func (c *CRDFileConfigProvider) Collect(_ context.Context) ([]integration.Config, error) {
entries, err := os.ReadDir(c.dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
log.Warnf("CRDFileConfigProvider: directory %q does not exist (expected mount point from ConfigMap)", c.dir)
} else {
log.Warnf("CRDFileConfigProvider: error reading directory %q: %s", c.dir, err)
}
return []integration.Config{}, nil
}

if len(entries) == 0 {
log.Debugf("CRDFileConfigProvider: directory %q is empty (no CRD-driven checks configured)", c.dir)
return []integration.Config{}, nil
}

integrationErrors := make(map[string]string)
var configs []integration.Config

for _, entry := range entries {
if entry.IsDir() {
continue
}

fileName := entry.Name()
ext := filepath.Ext(fileName)
if ext != ".yaml" && ext != ".yml" {
continue
}

filenameWithoutExt := strings.TrimSuffix(fileName, ext)
checkName, err := c.nameExtractor(filenameWithoutExt)
if err != nil {
log.Warnf("CRDFileConfigProvider: skipping file %q: %s", fileName, err)
integrationErrors[filenameWithoutExt] = err.Error()
continue
}

absPath := filepath.Join(c.dir, fileName)
conf, _, err := GetIntegrationConfigFromFile(checkName, absPath)
if err != nil {
log.Warnf("CRDFileConfigProvider: %q is not a valid config file: %s", absPath, err)
integrationErrors[checkName] = err.Error()
continue
}

if !WithoutAdvancedAD(conf) {
log.Debugf("CRDFileConfigProvider: skipping config %q with advanced AD identifiers", checkName)
continue
}

configs = append(configs, conf)
}

c.Errors = integrationErrors
if c.telemetryStore != nil {
c.telemetryStore.Errors.Set(float64(len(integrationErrors)), names.CRDFile)
}

return configs, nil
}

// IsUpToDate always returns false — polling is driven by the config poller.
func (c *CRDFileConfigProvider) IsUpToDate(_ context.Context) (bool, error) {
return false, nil
}

// String returns a string representation of the CRDFileConfigProvider.
func (c *CRDFileConfigProvider) String() string {
return names.CRDFile
}

// GetConfigErrors returns the errors encountered when collecting configs.
func (c *CRDFileConfigProvider) GetConfigErrors() map[string]types.ErrorMsgSet {
return make(map[string]types.ErrorMsgSet)
}
Loading
Loading