Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions internal/cmd/manager/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
coreWebhook "github.com/OT-CONTAINER-KIT/redis-operator/internal/webhook"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/time/rate"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -232,10 +234,16 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu

healer := redis.NewHealer(k8sClient)

// Use custom rate limiter for large clusters
controllerOpts := controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
RateLimiter: newCustomRateLimiter(),
}

if err := (&rediscontroller.Reconciler{
Client: mgr.GetClient(),
K8sClient: k8sClient,
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
}).SetupWithManager(mgr, controllerOpts); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Redis")
return err
}
Expand All @@ -246,7 +254,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu
Checker: redis.NewChecker(k8sClient),
Recorder: mgr.GetEventRecorderFor("rediscluster-controller"),
StatefulSet: k8sutils.NewStatefulSetService(k8sClient),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
}).SetupWithManager(mgr, controllerOpts); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedisCluster")
return err
}
Expand All @@ -255,7 +263,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu
K8sClient: k8sClient,
Healer: healer,
StatefulSet: k8sutils.NewStatefulSetService(k8sClient),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
}).SetupWithManager(mgr, controllerOpts); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedisReplication")
return err
}
Expand All @@ -265,7 +273,7 @@ func setupControllers(mgr ctrl.Manager, k8sClient kubernetes.Interface, maxConcu
Healer: healer,
K8sClient: k8sClient,
ReplicationWatcher: intctrlutil.NewResourceWatcher(),
}).SetupWithManager(mgr, controller.Options{MaxConcurrentReconciles: maxConcurrentReconciles}); err != nil {
}).SetupWithManager(mgr, controllerOpts); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "RedisSentinel")
return err
}
Expand Down Expand Up @@ -314,3 +322,11 @@ func setupHealthChecks(mgr ctrl.Manager) error {

return nil
}

// newCustomRateLimiter creates a rate limiter with max backoff of 30s instead of default 1000s.
func newCustomRateLimiter() workqueue.RateLimiter {
return workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 30*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)},
)
}
4 changes: 3 additions & 1 deletion internal/controller/redis/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/OT-CONTAINER-KIT/redis-operator/internal/controller/common"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/internal/controllerutil"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/k8sutils"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -67,13 +68,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err != nil {
return intctrlutil.RequeueE(ctx, err, "failed to create service")
}
return intctrlutil.RequeueAfter(ctx, time.Second*10, "requeue after 10 seconds")
return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile")
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rvb2.Redis{}).
Owns(&appsv1.StatefulSet{}).
WithOptions(opts).
Complete(r)
}
8 changes: 4 additions & 4 deletions internal/controller/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// Check if the cluster is downscaled
if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount {
if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") || !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower") {
return intctrlutil.Reconciled()
return intctrlutil.RequeueAfter(ctx, time.Second*30, "statefulset not ready during downscale")
}
if masterCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); masterCount == leaderCount {
r.Recorder.Event(instance, corev1.EventTypeNormal, events.EventReasonRedisClusterDownscale, "Redis cluster is downscaling...")
Expand Down Expand Up @@ -122,7 +122,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
monitoring.RedisClusterRebalanceTotal.WithLabelValues(instance.Namespace, instance.Name).Inc()
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
return intctrlutil.RequeueAfter(ctx, time.Second*30, "requeue after cluster rebalance")
} else {
logger.Info("masterCount is not equal to leader statefulset replicas,skip downscale", "masterCount", masterCount, "leaderReplicas", leaderReplicas)
}
Expand Down Expand Up @@ -196,7 +196,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") || !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower") {
return intctrlutil.Reconciled()
return intctrlutil.RequeueAfter(ctx, time.Second*30, "statefulset not ready")
}

// Mark the cluster status as bootstrapping if all the leader and follower nodes are ready
Expand Down Expand Up @@ -344,7 +344,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
}

return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile")
}

func (r *Reconciler) updateStatus(ctx context.Context, rc *rcvb2.RedisCluster, status rcvb2.RedisClusterStatus) (requeue bool, err error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/OT-CONTAINER-KIT/redis-operator/internal/k8sutils"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/monitoring"
"github.com/OT-CONTAINER-KIT/redis-operator/internal/service/redis"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -74,7 +75,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}
}

return intctrlutil.RequeueAfter(ctx, time.Second*30, "")
return intctrlutil.RequeueAfter(ctx, time.Minute*5, "periodic reconcile")
}

func (r *Reconciler) UpdateRedisReplicationMaster(ctx context.Context, instance *rrvb2.RedisReplication, masterNode string) error {
Expand Down Expand Up @@ -450,6 +451,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, rr *rrvb2.RedisReplicatio
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, opts controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
For(&rrvb2.RedisReplication{}).
Owns(&appsv1.StatefulSet{}).
WithOptions(opts).
Complete(r)
}
Loading