diff --git a/internal/cmd/manager/cmd.go b/internal/cmd/manager/cmd.go index e26ff17704..ff5aa3c16b 100644 --- a/internal/cmd/manager/cmd.go +++ b/internal/cmd/manager/cmd.go @@ -39,7 +39,9 @@ import ( coreWebhook "github.com/OT-CONTAINER-KIT/redis-operator/internal/webhook" "github.com/spf13/cobra" "github.com/spf13/viper" + "golang.org/x/time/rate" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -232,10 +234,16 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu healer := redis.NewHealer(k8sClient) + // Use custom rate limiter for large clusters + controllerOpts := controller.Options{ + MaxConcurrentReconciles: maxConcurrentReconciles, + RateLimiter: newCustomRateLimiter(), + } + if err := (&rediscontroller.Reconciler{ Client: mgr.GetClient(), K8sClient: k8sClient, - }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil { + }).SetupWithManager(mgr, controllerOpts); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Redis") return err } @@ -246,7 +254,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu Checker: redis.NewChecker(k8sClient), Recorder: mgr.GetEventRecorderFor("rediscluster-controller"), StatefulSet: k8sutils.NewStatefulSetService(k8sClient), - }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil { + }).SetupWithManager(mgr, controllerOpts); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RedisCluster") return err } @@ -255,7 +263,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu K8sClient: k8sClient, Healer: healer, StatefulSet: k8sutils.NewStatefulSetService(k8sClient), - }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil { + }).SetupWithManager(mgr, controllerOpts); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RedisReplication") return err } @@ -265,7 +273,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu Healer: healer, K8sClient: k8sClient, ReplicationWatcher: intctrlutil.NewResourceWatcher(), - }).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil { + }).SetupWithManager(mgr, controllerOpts); err != nil { setupLog.Error(err, "unable to create controller", "controller", "RedisSentinel") return err } @@ -314,3 +322,11 @@ func setupHealthChecks(mgr ctrl.Manager) error { return nil } + +// newCustomRateLimiter creates a rate limiter with max backoff of 30s instead of default 1000s. +func newCustomRateLimiter() workqueue.RateLimiter { + return workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)}, + ) +} diff --git a/internal/controller/redis/redis_controller.go b/internal/controller/redis/redis_controller.go index bdf60a6b87..c290337aea 100644 --- a/internal/controller/redis/redis_controller.go +++ b/internal/controller/redis/redis_controller.go @@ -24,6 +24,7 @@ import ( "github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common" intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/internal/controllerutil" "github.com/OT-CONTAINER-KIT/redis-operator/internal/k8sutils" + appsv1 "k8s.io/api/apps/v1" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -67,13 +68,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if err != nil { return intctrlutil.RequeueE(ctx, err, "failed to create service") } - return intctrlutil.RequeueAfter(ctx, time.Second*10, "requeue after 10 seconds") + return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile") } // SetupWithManager sets up the controller with the Manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error { return ctrl.NewControllerManagedBy(mgr). For(&rvb2.Redis{}). + Owns(&appsv1.StatefulSet{}). WithOptions(opts). Complete(r) } diff --git a/internal/controller/rediscluster/rediscluster_controller.go b/internal/controller/rediscluster/rediscluster_controller.go index 48b3b9a37c..6269f285cd 100644 --- a/internal/controller/rediscluster/rediscluster_controller.go +++ b/internal/controller/rediscluster/rediscluster_controller.go @@ -85,7 +85,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Check if the cluster is downscaled if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount { if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") || !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower") { - return intctrlutil.Reconciled() + return intctrlutil.RequeueAfter(ctx, time.Second*30, "statefulset not ready during downscale") } if masterCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); masterCount == leaderCount { r.Recorder.Event(instance, corev1.EventTypeNormal, events.EventReasonRedisClusterDownscale, "Redis cluster is downscaling...") @@ -122,7 +122,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance) logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done") monitoring.RedisClusterRebalanceTotal.WithLabelValues(instance.Namespace, instance.Name).Inc() - return intctrlutil.RequeueAfter(ctx, time.Second*10, "") + return intctrlutil.RequeueAfter(ctx, time.Second*30, "requeue after cluster rebalance") } else { logger.Info("masterCount is not equal to leader statefulset replicas,skip downscale", "masterCount", masterCount, "leaderReplicas", leaderReplicas) } @@ -196,7 +196,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") || !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower") { - return intctrlutil.Reconciled() + return intctrlutil.RequeueAfter(ctx, time.Second*30, "statefulset not ready") } // Mark the cluster status as bootstrapping if all the leader and follower nodes are ready @@ -344,7 +344,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } } - return intctrlutil.RequeueAfter(ctx, time.Second*10, "") + return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile") } func (r *Reconciler) updateStatus(ctx context.Context, rc *rcvb2.RedisCluster, status rcvb2.RedisClusterStatus) (requeue bool, err error) { diff --git a/internal/controller/redisreplication/redisreplication_controller.go b/internal/controller/redisreplication/redisreplication_controller.go index ddc9a331b4..58e2805a19 100644 --- a/internal/controller/redisreplication/redisreplication_controller.go +++ b/internal/controller/redisreplication/redisreplication_controller.go @@ -16,6 +16,7 @@ import ( "github.com/OT-CONTAINER-KIT/redis-operator/internal/k8sutils" "github.com/OT-CONTAINER-KIT/redis-operator/internal/monitoring" "github.com/OT-CONTAINER-KIT/redis-operator/internal/service/redis" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -74,7 +75,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } } - return intctrlutil.RequeueAfter(ctx, time.Second*30, "") + return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile") } func (r *Reconciler) UpdateRedisReplicationMaster(ctx context.Context, instance *rrvb2.RedisReplication, masterNode string) error { @@ -450,6 +451,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, rr *rrvb2.RedisReplicatio func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error { return ctrl.NewControllerManagedBy(mgr). For(&rrvb2.RedisReplication{}). + Owns(&appsv1.StatefulSet{}). WithOptions(opts). Complete(r) }