From 711715d54d9f12a0ca1b553b2b52064590801a30 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Mon, 27 Jan 2025 18:27:44 +0100 Subject: [PATCH 1/7] Fine tune watch selectors for less memory usage Signed-off-by: Peter Wilcsinszky --- main.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index 2a8b98cea..e67955b15 100644 --- a/main.go +++ b/main.go @@ -31,11 +31,14 @@ import ( prometheusOperator "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "github.com/spf13/cast" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/klog/v2" @@ -48,6 +51,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + telemetryv1alpha1 "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" + extensionsControllers "github.com/kube-logging/logging-operator/controllers/extensions" loggingControllers "github.com/kube-logging/logging-operator/controllers/logging" extensionsv1alpha1 "github.com/kube-logging/logging-operator/pkg/sdk/extensions/api/v1alpha1" @@ -56,7 +61,6 @@ import ( loggingv1beta1 "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/types" "github.com/kube-logging/logging-operator/pkg/webhook/podhandler" - telemetryv1alpha1 "github.com/kube-logging/telemetry-controller/api/telemetry/v1alpha1" // +kubebuilder:scaffold:imports ) @@ -84,6 +88,8 @@ func main() { var enableprofile bool var namespace string var loggingRef string + var watchLabeledChildren bool + var watchLabeledSecrets bool var finalizerCleanup bool var enableTelemetryControllerRoute bool var klogLevel int @@ -98,6 +104,8 @@ func main() { flag.BoolVar(&enableprofile, "pprof", false, "Enable pprof") flag.StringVar(&namespace, "watch-namespace", "", "Namespace to filter the list of watched objects") flag.StringVar(&loggingRef, "watch-logging-name", "", "Logging resource name to optionally filter the list of watched objects based on which logging they belong to by checking the app.kubernetes.io/managed-by label") + flag.BoolVar(&watchLabeledChildren, "watch-labeled-children", false, "Only watch child resources with logging operator's name label selector: app.kubernetes.io/name: fluentd|fluentbit|syslog-ng") + flag.BoolVar(&watchLabeledSecrets, "watch-labeled-secrets", false, "Only watch secrets with the following label selector: logging.banzaicloud.io/watch: enabled") flag.BoolVar(&finalizerCleanup, "finalizer-cleanup", false, "Remove finalizers from Logging resources during operator shutdown, useful for Helm uninstallation") flag.BoolVar(&enableTelemetryControllerRoute, "enable-telemetry-controller-route", false, "Enable the Telemetry Controller route for Logging resources") flag.StringVar(&syncPeriod, "sync-period", "", "SyncPeriod determines the minimum frequency at which watched resources are reconciled. Defaults to 10 hours. Parsed using time.ParseDuration.") @@ -152,7 +160,12 @@ func main() { mgrOptions.WebhookServer = webhookServer } - customMgrOptions, err := setupCustomCache(&mgrOptions, syncPeriod, namespace, loggingRef) + customMgrOptions, err := setupCustomCache(&mgrOptions, syncPeriod, namespace, loggingRef, watchLabeledChildren) + if watchLabeledSecrets { + customMgrOptions.Cache.ByObject[&corev1.Secret{}] = cache.ByObject{ + Label: labels.Set{"logging.banzaicloud.io/watch": "enabled"}.AsSelector(), + } + } if err != nil { setupLog.Error(err, "unable to set up custom cache settings") os.Exit(1) @@ -312,7 +325,7 @@ func detectContainerRuntime(ctx context.Context, c client.Reader) error { return nil } -func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace string, loggingRef string) (*ctrl.Options, error) { +func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace string, loggingRef string, watchLabeledChildren bool) (*ctrl.Options, error) { if syncPeriod != "" { duration, err := time.ParseDuration(syncPeriod) if err != nil { @@ -321,7 +334,7 @@ func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace str mgrOptions.Cache.SyncPeriod = &duration } - if namespace == "" && loggingRef == "" { + if namespace == "" && loggingRef == "" && !watchLabeledChildren { return mgrOptions, nil } @@ -333,6 +346,21 @@ func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace str if loggingRef != "" { labelSelector = labels.Set{"app.kubernetes.io/managed-by": loggingRef}.AsSelector() } + if watchLabeledChildren { + if labelSelector == nil { + labelSelector = labels.NewSelector() + } + // It would be much better to watch for a common label, but we don't have that yet. + // Adding a new label would recreate statefulsets and daemonsets which would be undesirable. + // Let's see how this works in the wild. We can optimize in a subsequent iteration. + req, err := labels.NewRequirement("app.kubernetes.io/name", selection.In, []string{ + "fluentd", "syslog-ng", "fluentbit", + }) + if err != nil { + return nil, err + } + labelSelector = labelSelector.Add(*req) + } mgrOptions.Cache = cache.Options{ ByObject: map[client.Object]cache.ByObject{ @@ -340,6 +368,34 @@ func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace str Field: namespaceSelector, Label: labelSelector, }, + &batchv1.Job{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.Service{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.Role{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.ClusterRole{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.RoleBinding{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.ClusterRoleBinding{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.ServiceAccount{}: { + Field: namespaceSelector, + Label: labelSelector, + }, &appsv1.DaemonSet{}: { Field: namespaceSelector, Label: labelSelector, @@ -356,6 +412,10 @@ func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace str Field: namespaceSelector, Label: labelSelector, }, + &corev1.ConfigMap{}: { + Field: namespaceSelector, + Label: labelSelector, + }, }, } From 72c881033864a373fc4652711c0b6a4734f8d255 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Thu, 30 Jan 2025 15:31:39 +0100 Subject: [PATCH 2/7] fix the secret label watch to work without child resource watch Signed-off-by: Peter Wilcsinszky --- main.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index e67955b15..7b33a071c 100644 --- a/main.go +++ b/main.go @@ -161,15 +161,18 @@ func main() { } customMgrOptions, err := setupCustomCache(&mgrOptions, syncPeriod, namespace, loggingRef, watchLabeledChildren) + if err != nil { + setupLog.Error(err, "unable to set up custom cache settings") + os.Exit(1) + } if watchLabeledSecrets { + if customMgrOptions.Cache.ByObject == nil { + customMgrOptions.Cache.ByObject = make(map[client.Object]cache.ByObject) + } customMgrOptions.Cache.ByObject[&corev1.Secret{}] = cache.ByObject{ Label: labels.Set{"logging.banzaicloud.io/watch": "enabled"}.AsSelector(), } } - if err != nil { - setupLog.Error(err, "unable to set up custom cache settings") - os.Exit(1) - } if enableprofile { setupLog.Info("enabling pprof") From b8327828eca5aa735ceb01d638351b6d502c8a88 Mon Sep 17 00:00:00 2001 From: Peter Wilcsinszky Date: Thu, 30 Jan 2025 15:32:13 +0100 Subject: [PATCH 3/7] add watch labels to all managed secrets Signed-off-by: Peter Wilcsinszky Co-authored-by: Bence Csati --- pkg/resources/fluentbit/configsecret.go | 8 ++++++-- pkg/resources/fluentd/appconfigmap.go | 19 ++++++++++++++++--- pkg/resources/fluentd/configsecret.go | 12 +++++++++--- pkg/resources/fluentd/outputsecret.go | 5 +++++ pkg/resources/syslogng/configcheck.go | 13 ++++++++++++- pkg/resources/syslogng/configsecret.go | 5 +++++ pkg/resources/syslogng/outputsecret.go | 5 +++++ 7 files changed, 58 insertions(+), 9 deletions(-) diff --git a/pkg/resources/fluentbit/configsecret.go b/pkg/resources/fluentbit/configsecret.go index ede0e2587..69b372555 100644 --- a/pkg/resources/fluentbit/configsecret.go +++ b/pkg/resources/fluentbit/configsecret.go @@ -448,9 +448,13 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er } r.configs = confs - + meta := r.FluentbitObjectMeta(fluentBitSecretConfigName) + meta.Labels = utils.MergeLabels( + meta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) return &corev1.Secret{ - ObjectMeta: r.FluentbitObjectMeta(fluentBitSecretConfigName), + ObjectMeta: meta, Data: confs, }, reconciler.StatePresent, nil } diff --git a/pkg/resources/fluentd/appconfigmap.go b/pkg/resources/fluentd/appconfigmap.go index 9f22addbc..98d85689d 100644 --- a/pkg/resources/fluentd/appconfigmap.go +++ b/pkg/resources/fluentd/appconfigmap.go @@ -21,6 +21,7 @@ import ( "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/reconciler" + "github.com/cisco-open/operator-tools/pkg/utils" "github.com/spf13/cast" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -49,9 +50,11 @@ func (r *Reconciler) appConfigSecret() (runtime.Object, reconciler.DesiredState, } else { data[AppConfigKey] = []byte(*r.config) } + meta := r.FluentdObjectMeta(AppSecretConfigName, ComponentFluentd) + meta.Labels = utils.MergeLabels(meta.Labels, map[string]string{"logging.banzaicloud.io/watch": "enabled"}) return &corev1.Secret{ - ObjectMeta: r.FluentdObjectMeta(AppSecretConfigName, ComponentFluentd), + ObjectMeta: meta, Data: data, }, reconciler.StatePresent, nil } @@ -214,8 +217,11 @@ func (r *Reconciler) newCheckSecret(hashKey string, fluentdSpec v1beta1.FluentdS data[ConfigCheckKey] = []byte(*r.config) } data["fluent.conf"] = []byte(fluentdConfigCheckTemplate) + meta := r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentConfigCheck) + meta.Labels = utils.MergeLabels(meta.Labels, map[string]string{"logging.banzaicloud.io/watch": "enabled"}) + return &corev1.Secret{ - ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-%s", hashKey), ComponentConfigCheck), + ObjectMeta: meta, Data: data, }, nil } @@ -229,8 +235,11 @@ func (r *Reconciler) newCheckSecretAppConfig(hashKey string, fluentdSpec v1beta1 } else { data[ConfigCheckKey] = []byte(*r.config) } + meta := r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-app-%s", hashKey), ComponentConfigCheck) + meta.Labels = utils.MergeLabels(meta.Labels, map[string]string{"logging.banzaicloud.io/watch": "enabled"}) + return &corev1.Secret{ - ObjectMeta: r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-app-%s", hashKey), ComponentConfigCheck), + ObjectMeta: meta, Data: data, }, nil } @@ -242,6 +251,10 @@ func (r *Reconciler) newCheckOutputSecret(hashKey string) (*corev1.Secret, error } if secret, ok := obj.(*corev1.Secret); ok { secret.ObjectMeta = r.FluentdObjectMeta(fmt.Sprintf("fluentd-configcheck-output-%s", hashKey), ComponentConfigCheck) + secret.ObjectMeta.Labels = utils.MergeLabels( + secret.ObjectMeta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) return secret, nil } return nil, errors.New("output secret is invalid, unable to create output secret for config check") diff --git a/pkg/resources/fluentd/configsecret.go b/pkg/resources/fluentd/configsecret.go index 4d7e3932e..aafe03742 100644 --- a/pkg/resources/fluentd/configsecret.go +++ b/pkg/resources/fluentd/configsecret.go @@ -21,9 +21,11 @@ import ( "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/reconciler" - "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" + "github.com/cisco-open/operator-tools/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + + "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" ) type fluentdConfig struct { @@ -96,10 +98,14 @@ func (r *Reconciler) secretConfig() (runtime.Object, reconciler.DesiredState, er return nil, nil, err } configMap["fluentlog.conf"] = []byte(fmt.Sprintf(fluentLog, r.fluentdSpec.FluentLogDestination)) + meta := r.FluentdObjectMeta(SecretConfigName, ComponentFluentd) + meta.Labels = utils.MergeLabels( + meta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) configs := &corev1.Secret{ - ObjectMeta: r.FluentdObjectMeta(SecretConfigName, ComponentFluentd), + ObjectMeta: meta, Data: configMap, } - return configs, reconciler.StatePresent, nil } diff --git a/pkg/resources/fluentd/outputsecret.go b/pkg/resources/fluentd/outputsecret.go index e153c84f7..d2c16c9f6 100644 --- a/pkg/resources/fluentd/outputsecret.go +++ b/pkg/resources/fluentd/outputsecret.go @@ -21,6 +21,7 @@ import ( "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/cisco-open/operator-tools/pkg/secret" + "github.com/cisco-open/operator-tools/pkg/utils" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -76,6 +77,10 @@ func (r *Reconciler) outputSecret(secrets *secret.MountSecrets, mountPath string fluentOutputSecret.Data[secret.MappedKey] = secret.Value } } + fluentOutputSecret.ObjectMeta.Labels = utils.MergeLabels( + fluentOutputSecret.ObjectMeta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) return fluentOutputSecret, reconciler.StatePresent, nil } diff --git a/pkg/resources/syslogng/configcheck.go b/pkg/resources/syslogng/configcheck.go index 02df3379e..dcf80e89a 100644 --- a/pkg/resources/syslogng/configcheck.go +++ b/pkg/resources/syslogng/configcheck.go @@ -22,6 +22,7 @@ import ( "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/merge" + "github.com/cisco-open/operator-tools/pkg/utils" "github.com/spf13/cast" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -163,8 +164,13 @@ func (r *Reconciler) configCheck(ctx context.Context) (*ConfigCheckResult, error } func (r *Reconciler) newCheckSecret(hashKey string) (*corev1.Secret, error) { + meta := r.SyslogNGObjectMeta(configCheckResourceName(hashKey), ComponentConfigCheck) + meta.Labels = utils.MergeLabels( + meta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) return &corev1.Secret{ - ObjectMeta: r.SyslogNGObjectMeta(configCheckResourceName(hashKey), ComponentConfigCheck), + ObjectMeta: meta, Data: map[string][]byte{ configKey: []byte(r.config), }, @@ -178,6 +184,11 @@ func (r *Reconciler) newCheckOutputSecret(hashKey string) (*corev1.Secret, error } if secret, ok := obj.(*corev1.Secret); ok { secret.ObjectMeta = r.SyslogNGObjectMeta(fmt.Sprintf("syslog-ng-configcheck-output-%s", hashKey), ComponentConfigCheck) + secret.ObjectMeta.Labels = utils.MergeLabels( + secret.ObjectMeta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) + return secret, nil } return nil, errors.New("output secret is invalid, unable to create output secret for config check") diff --git a/pkg/resources/syslogng/configsecret.go b/pkg/resources/syslogng/configsecret.go index ea0d7a815..cfbb1d967 100644 --- a/pkg/resources/syslogng/configsecret.go +++ b/pkg/resources/syslogng/configsecret.go @@ -16,6 +16,7 @@ package syslogng import ( "github.com/cisco-open/operator-tools/pkg/reconciler" + "github.com/cisco-open/operator-tools/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -27,6 +28,10 @@ func (r *Reconciler) configSecret() (runtime.Object, reconciler.DesiredState, er configKey: []byte(r.config), }, } + secret.ObjectMeta.Labels = utils.MergeLabels( + secret.ObjectMeta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) return secret, reconciler.StatePresent, nil } diff --git a/pkg/resources/syslogng/outputsecret.go b/pkg/resources/syslogng/outputsecret.go index bd1c2a685..edf5f0614 100644 --- a/pkg/resources/syslogng/outputsecret.go +++ b/pkg/resources/syslogng/outputsecret.go @@ -21,6 +21,7 @@ import ( "emperror.dev/errors" "github.com/cisco-open/operator-tools/pkg/reconciler" "github.com/cisco-open/operator-tools/pkg/secret" + "github.com/cisco-open/operator-tools/pkg/utils" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -62,6 +63,10 @@ func (r *Reconciler) outputSecret(secrets *secret.MountSecrets, mountPath string Namespace: r.Logging.Spec.ControlNamespace, }, } + syslogNGOutputSecret.ObjectMeta.Labels = utils.MergeLabels( + syslogNGOutputSecret.ObjectMeta.Labels, + map[string]string{"logging.banzaicloud.io/watch": "enabled"}, + ) if syslogNGOutputSecret.Data == nil { syslogNGOutputSecret.Data = make(map[string][]byte) } From e85f0f18492e0c989dcea8d6f25546701a47a61f Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Mon, 3 Feb 2025 13:47:02 +0100 Subject: [PATCH 4/7] fix: inline error check in previous tests Signed-off-by: Bence Csati --- .../logging_metrics_monitoring_test.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/e2e/logging_metrics_monitoring/logging_metrics_monitoring_test.go b/e2e/logging_metrics_monitoring/logging_metrics_monitoring_test.go index 2ed4771c4..b28efd758 100644 --- a/e2e/logging_metrics_monitoring/logging_metrics_monitoring_test.go +++ b/e2e/logging_metrics_monitoring/logging_metrics_monitoring_test.go @@ -252,12 +252,11 @@ func TestLoggingMetrics_Monitoring(t *testing.T) { func installPrometheusOperator(c common.Cluster) error { manager := helm.New(c.KubeConfigFilePath()) - err := manager.RunRepo(helm.WithArgs("add", "prometheus-community", "https://prometheus-community.github.io/helm-charts")) - if err != nil { - return fmt.Errorf("failed to add prometheus-community repo: %w", err) + if err := manager.RunRepo(helm.WithArgs("add", "prometheus-community", "https://prometheus-community.github.io/helm-charts")); err != nil { + return fmt.Errorf("failed to add prometheus-community repo: %v", err) } - err = manager.RunInstall( + if err := manager.RunInstall( helm.WithName("prometheus"), helm.WithChart("prometheus-community/kube-prometheus-stack"), helm.WithArgs("--create-namespace"), @@ -265,9 +264,8 @@ func installPrometheusOperator(c common.Cluster) error { helm.WithArgs("--set", "prometheus.prometheusSpec.serviceMonitorSelectorNilUsesHelmValues=false"), helm.WithArgs("--set", "prometheus.prometheusSpec.podMonitorSelectorNilUsesHelmValues=false"), helm.WithWait(), - ) - if err != nil { - return fmt.Errorf("failed to install prometheus-operator: %w", err) + ); err != nil { + return fmt.Errorf("failed to install prometheus: %v", err) } return nil From 180e769b2726911258bea65d043aece0eff2d138 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Mon, 3 Feb 2025 13:47:33 +0100 Subject: [PATCH 5/7] feat: add e2e test for watch selector Signed-off-by: Bence Csati --- e2e/common/setup/loggingoperator.go | 2 + e2e/watch-selector/watch_selector_test.go | 208 ++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 e2e/watch-selector/watch_selector_test.go diff --git a/e2e/common/setup/loggingoperator.go b/e2e/common/setup/loggingoperator.go index 9a11dde3b..872edc68f 100644 --- a/e2e/common/setup/loggingoperator.go +++ b/e2e/common/setup/loggingoperator.go @@ -137,6 +137,7 @@ func LoggingOperator(t *testing.T, c common.Cluster, opts ...LoggingOperatorOpti "value": "/covdatafiles", }, }, + "extraArgs": opt.Args, }) if err != nil { t.Fatalf("helm chart install: %s", err) @@ -173,4 +174,5 @@ type LoggingOperatorOptions struct { NameOverride string PollInterval time.Duration Timeout time.Duration + Args []string } diff --git a/e2e/watch-selector/watch_selector_test.go b/e2e/watch-selector/watch_selector_test.go new file mode 100644 index 000000000..cd7aa490a --- /dev/null +++ b/e2e/watch-selector/watch_selector_test.go @@ -0,0 +1,208 @@ +// Copyright © 2025 Kube logging authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watch_selector + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/e2e-framework/third_party/helm" + + "github.com/kube-logging/logging-operator/e2e/common" + "github.com/kube-logging/logging-operator/e2e/common/cond" + "github.com/kube-logging/logging-operator/e2e/common/setup" + "github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1" + "github.com/stretchr/testify/require" +) + +var TestTempDir string + +func init() { + var ok bool + TestTempDir, ok = os.LookupEnv("PROJECT_DIR") + if !ok { + TestTempDir = "../.." + } + TestTempDir = filepath.Join(TestTempDir, "build/_test") + err := os.MkdirAll(TestTempDir, os.FileMode(0755)) + if err != nil { + panic(err) + } +} + +func TestWatchSelectors(t *testing.T) { + common.Initialize(t) + ns := "test" + releaseNameOverride := "e2e" + common.WithCluster("watch-selector", t, func(t *testing.T, c common.Cluster) { + setup.LoggingOperator(t, c, setup.LoggingOperatorOptionFunc(func(options *setup.LoggingOperatorOptions) { + options.Namespace = ns + options.NameOverride = releaseNameOverride + options.Args = []string{"-enable-leader-election=true", "-watch-labeled-children=true", "-watch-labeled-secrets=true"} + })) + + ctx := context.Background() + + // Managed logging resource which creates a fluentd pod with a secret named: watch-selector-test-fluentd + logging := v1beta1.Logging{ + ObjectMeta: metav1.ObjectMeta{ + Name: "watch-selector-test", + Namespace: ns, + }, + Spec: v1beta1.LoggingSpec{ + ControlNamespace: ns, + FluentbitSpec: &v1beta1.FluentbitSpec{}, + FluentdSpec: &v1beta1.FluentdSpec{ + Image: v1beta1.ImageSpec{ + Repository: common.FluentdImageRepo, + Tag: common.FluentdImageTag, + }, + }, + }, + } + common.RequireNoError(t, c.GetClient().Create(ctx, &logging)) + + // Unmanaged resources + common.RequireNoError(t, installFluentdSts(c)) + + unmanagedSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unmanaged-fluentd-secret", + Namespace: ns, + Labels: map[string]string{ + "app": "fluentd", + }, + }, + Data: map[string][]byte{ + "key": []byte("value"), + }, + } + common.RequireNoError(t, c.GetClient().Create(ctx, unmanagedSecret)) + + require.Eventually(t, func() bool { + if isManagedFluentdPodRunning := cond.PodShouldBeRunning(t, c.GetClient(), client.ObjectKey{Namespace: ns, Name: logging.Name + "-fluentd-0"}); !isManagedFluentdPodRunning() { + t.Logf("managed fluentd pod is not running") + return false + } + + if isUnmanagedFluentdPodRunning := cond.PodShouldBeRunning(t, c.GetClient(), client.ObjectKey{Namespace: "fluentd", Name: "fluentd-0"}); !isUnmanagedFluentdPodRunning() { + t.Logf("unmanaged fluentd pod is not running") + return false + } + + return true + }, 5*time.Minute, 3*time.Second) + + deployedLogging := &v1beta1.Logging{} + common.RequireNoError(t, c.GetClient().Get(ctx, client.ObjectKeyFromObject(&logging), deployedLogging)) + + // Check if the managed resources are actually controlled by the logging resource + managedSts := &appsv1.StatefulSet{} + common.RequireNoError(t, c.GetClient().Get(ctx, client.ObjectKey{Namespace: ns, Name: deployedLogging.Name + "-fluentd"}, managedSts)) + stsOwnerRefMeta := metav1.GetControllerOf(managedSts) + require.NotNil(t, stsOwnerRefMeta) + + require.Equal(t, deployedLogging.APIVersion, stsOwnerRefMeta.APIVersion) + require.Equal(t, deployedLogging.Kind, stsOwnerRefMeta.Kind) + require.Equal(t, deployedLogging.Name, stsOwnerRefMeta.Name) + require.True(t, *stsOwnerRefMeta.Controller) + + managedSecret := &corev1.Secret{} + common.RequireNoError(t, c.GetClient().Get(ctx, client.ObjectKey{Namespace: ns, Name: deployedLogging.Name + "-fluentd"}, managedSecret)) + secretOwnerRefMeta := metav1.GetControllerOf(managedSecret) + require.NotNil(t, secretOwnerRefMeta) + + require.Equal(t, deployedLogging.APIVersion, secretOwnerRefMeta.APIVersion) + require.Equal(t, deployedLogging.Kind, secretOwnerRefMeta.Kind) + require.Equal(t, deployedLogging.Name, secretOwnerRefMeta.Name) + require.True(t, *secretOwnerRefMeta.Controller) + + // Check if the unmanaged resources are actually not controlled by the operator + unmanagedSts := &appsv1.StatefulSet{} + common.RequireNoError(t, c.GetClient().Get(ctx, client.ObjectKey{Namespace: "fluentd", Name: "fluentd"}, unmanagedSts)) + secretOwnerRefMeta = metav1.GetControllerOf(unmanagedSts) + require.Nil(t, secretOwnerRefMeta) + + secret := &corev1.Secret{} + common.RequireNoError(t, c.GetClient().Get(ctx, client.ObjectKeyFromObject(unmanagedSecret), secret)) + secretOwnerRefMeta = metav1.GetControllerOf(secret) + require.Nil(t, secretOwnerRefMeta) + + }, func(t *testing.T, c common.Cluster) error { + path := filepath.Join(TestTempDir, fmt.Sprintf("cluster-%s.log", t.Name())) + t.Logf("Printing cluster logs to %s", path) + err := c.PrintLogs(common.PrintLogConfig{ + Namespaces: []string{ns, "default"}, + FilePath: path, + Limit: 100 * 1000, + }) + if err != nil { + return err + } + + loggingOperatorName := "logging-operator-" + releaseNameOverride + t.Logf("Collecting coverage files from logging-operator: %s/%s", ns, loggingOperatorName) + err = c.CollectTestCoverageFiles(ns, loggingOperatorName) + if err != nil { + t.Logf("Failed collecting coverage files: %s", err) + } + return err + + }, func(o *cluster.Options) { + if o.Scheme == nil { + o.Scheme = runtime.NewScheme() + } + common.RequireNoError(t, v1beta1.AddToScheme(o.Scheme)) + common.RequireNoError(t, apiextensionsv1.AddToScheme(o.Scheme)) + common.RequireNoError(t, appsv1.AddToScheme(o.Scheme)) + common.RequireNoError(t, batchv1.AddToScheme(o.Scheme)) + common.RequireNoError(t, corev1.AddToScheme(o.Scheme)) + common.RequireNoError(t, rbacv1.AddToScheme(o.Scheme)) + }) +} + +func installFluentdSts(c common.Cluster) error { + manager := helm.New(c.KubeConfigFilePath()) + + if err := manager.RunRepo(helm.WithArgs("add", "fluent", "https://fluent.github.io/helm-charts")); err != nil { + return fmt.Errorf("failed to add fluent repo: %v", err) + } + + if err := manager.RunInstall( + helm.WithName("fluentd"), + helm.WithChart("fluent/fluentd"), + helm.WithArgs("--create-namespace"), + helm.WithNamespace("fluentd"), + helm.WithArgs("--set", "kind=StatefulSet"), + helm.WithWait(), + ); err != nil { + return fmt.Errorf("failed to install fluentd: %v", err) + } + + return nil +} From 1699a261574fc747beeca3cb283be47049583d49 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Mon, 3 Feb 2025 14:26:08 +0100 Subject: [PATCH 6/7] fix(test): increase timeout Signed-off-by: Bence Csati --- controllers/logging/logging_controller_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/logging/logging_controller_test.go b/controllers/logging/logging_controller_test.go index 913a6ae13..058e86d17 100644 --- a/controllers/logging/logging_controller_test.go +++ b/controllers/logging/logging_controller_test.go @@ -57,7 +57,7 @@ var ( ) const ( - timeout = 5 * time.Second + timeout = 10 * time.Second ) func TestFluentdResourcesCreatedAndRemoved(t *testing.T) { From 5f5aee023fc1890b5a6b782c061cff39c31b5736 Mon Sep 17 00:00:00 2001 From: Bence Csati Date: Mon, 3 Feb 2025 15:13:03 +0100 Subject: [PATCH 7/7] fix: -watch-namespace flag Signed-off-by: Bence Csati --- main.go | 161 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 82 insertions(+), 79 deletions(-) diff --git a/main.go b/main.go index 7b33a071c..71dfd65f3 100644 --- a/main.go +++ b/main.go @@ -337,89 +337,92 @@ func setupCustomCache(mgrOptions *ctrl.Options, syncPeriod string, namespace str mgrOptions.Cache.SyncPeriod = &duration } - if namespace == "" && loggingRef == "" && !watchLabeledChildren { - return mgrOptions, nil - } - - var namespaceSelector fields.Selector - var labelSelector labels.Selector - if namespace != "" { - namespaceSelector = fields.Set{"metadata.namespace": namespace}.AsSelector() - } - if loggingRef != "" { - labelSelector = labels.Set{"app.kubernetes.io/managed-by": loggingRef}.AsSelector() - } - if watchLabeledChildren { - if labelSelector == nil { - labelSelector = labels.NewSelector() + if namespace != "" || loggingRef != "" || watchLabeledChildren { + var namespaceSelector fields.Selector + var labelSelector labels.Selector + if namespace != "" { + namespaceSelector = fields.Set{"metadata.namespace": namespace}.AsSelector() + mgrOptions.Cache.DefaultNamespaces = map[string]cache.Config{ + namespace: { + FieldSelector: namespaceSelector, + }, + } } - // It would be much better to watch for a common label, but we don't have that yet. - // Adding a new label would recreate statefulsets and daemonsets which would be undesirable. - // Let's see how this works in the wild. We can optimize in a subsequent iteration. - req, err := labels.NewRequirement("app.kubernetes.io/name", selection.In, []string{ - "fluentd", "syslog-ng", "fluentbit", - }) - if err != nil { - return nil, err + if loggingRef != "" { + labelSelector = labels.Set{"app.kubernetes.io/managed-by": loggingRef}.AsSelector() + } + if watchLabeledChildren { + if labelSelector == nil { + labelSelector = labels.NewSelector() + } + // It would be much better to watch for a common label, but we don't have that yet. + // Adding a new label would recreate statefulsets and daemonsets which would be undesirable. + // Let's see how this works in the wild. We can optimize in a subsequent iteration. + req, err := labels.NewRequirement("app.kubernetes.io/name", selection.In, []string{ + "fluentd", "syslog-ng", "fluentbit", + }) + if err != nil { + return nil, err + } + labelSelector = labelSelector.Add(*req) } - labelSelector = labelSelector.Add(*req) - } - mgrOptions.Cache = cache.Options{ - ByObject: map[client.Object]cache.ByObject{ - &corev1.Pod{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &batchv1.Job{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &corev1.Service{}: { - Field: namespaceSelector, - Label: labelSelector, + mgrOptions.Cache = cache.Options{ + ByObject: map[client.Object]cache.ByObject{ + &corev1.Pod{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &batchv1.Job{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.Service{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.Role{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.ClusterRole{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.RoleBinding{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &rbacv1.ClusterRoleBinding{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.ServiceAccount{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &appsv1.DaemonSet{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &appsv1.StatefulSet{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &appsv1.Deployment{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.PersistentVolumeClaim{}: { + Field: namespaceSelector, + Label: labelSelector, + }, + &corev1.ConfigMap{}: { + Field: namespaceSelector, + Label: labelSelector, + }, }, - &rbacv1.Role{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &rbacv1.ClusterRole{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &rbacv1.RoleBinding{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &rbacv1.ClusterRoleBinding{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &corev1.ServiceAccount{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &appsv1.DaemonSet{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &appsv1.StatefulSet{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &appsv1.Deployment{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &corev1.PersistentVolumeClaim{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - &corev1.ConfigMap{}: { - Field: namespaceSelector, - Label: labelSelector, - }, - }, + } } return mgrOptions, nil