@@ -11,6 +11,7 @@ import (
1111 "k8s.io/apimachinery/pkg/runtime"
1212 "k8s.io/apimachinery/pkg/types"
1313 "sigs.k8s.io/controller-runtime/pkg/client"
14+ "sigs.k8s.io/controller-runtime/pkg/cluster"
1415 "sigs.k8s.io/controller-runtime/pkg/controller"
1516 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
1617 "sigs.k8s.io/controller-runtime/pkg/event"
@@ -54,6 +55,7 @@ import (
5455 "github.com/mongodb/mongodb-kubernetes/pkg/dns"
5556 "github.com/mongodb/mongodb-kubernetes/pkg/images"
5657 "github.com/mongodb/mongodb-kubernetes/pkg/kube"
58+ "github.com/mongodb/mongodb-kubernetes/pkg/multicluster"
5759 "github.com/mongodb/mongodb-kubernetes/pkg/statefulset"
5860 "github.com/mongodb/mongodb-kubernetes/pkg/util"
5961 "github.com/mongodb/mongodb-kubernetes/pkg/util/architectures"
@@ -70,6 +72,7 @@ import (
7072type ReconcileMongoDbReplicaSet struct {
7173 * ReconcileCommonController
7274 omConnectionFactory om.ConnectionFactory
75+ memberClustersMap map [string ]client.Client
7376 imageUrls images.ImageUrls
7477 forceEnterprise bool
7578 enableClusterMongoDBRoles bool
@@ -185,6 +188,7 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R
185188 log .Infow ("ReplicaSet.Spec" , "spec" , rs .Spec , "desiredReplicas" , scale .ReplicasThisReconciliation (rs ), "isScaling" , scale .IsStillScaling (rs ))
186189 log .Infow ("ReplicaSet.Status" , "status" , rs .Status )
187190
191+ // TODO: adapt validations to multi cluster
188192 if err := rs .ProcessValidationsOnReconcile (nil ); err != nil {
189193 return r .updateStatus (ctx , workflow .Invalid ("%s" , err .Error ()))
190194 }
@@ -324,10 +328,16 @@ func (r *ReplicaSetReconcilerHelper) Reconcile(ctx context.Context) (reconcile.R
324328 return r .updateStatus (ctx , workflow .OK (), mdbstatus .NewBaseUrlOption (deployment .Link (conn .BaseURL (), conn .GroupID ())), mdbstatus .MembersOption (rs ), mdbstatus .NewPVCsStatusOptionEmptyStatus ())
325329}
326330
327- func newReplicaSetReconciler (ctx context.Context , kubeClient client.Client , imageUrls images.ImageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion string , forceEnterprise bool , enableClusterMongoDBRoles bool , omFunc om.ConnectionFactory ) * ReconcileMongoDbReplicaSet {
331+ func newReplicaSetReconciler (ctx context.Context , kubeClient client.Client , imageUrls images.ImageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion string , forceEnterprise bool , enableClusterMongoDBRoles bool , memberClusterMap map [string ]client.Client , omFunc om.ConnectionFactory ) * ReconcileMongoDbReplicaSet {
332+ // Initialize member cluster map for single-cluster mode (like ShardedCluster does)
333+ // This ensures that even in single-cluster deployments, we have a __default member cluster
334+ // This allows the same reconciliation logic to work for both single and multi-cluster topologies
335+ memberClusterMap = multicluster .InitializeGlobalMemberClusterMapForSingleCluster (memberClusterMap , kubeClient )
336+
328337 return & ReconcileMongoDbReplicaSet {
329338 ReconcileCommonController : NewReconcileCommonController (ctx , kubeClient ),
330339 omConnectionFactory : omFunc ,
340+ memberClustersMap : memberClusterMap ,
331341 imageUrls : imageUrls ,
332342 forceEnterprise : forceEnterprise ,
333343 enableClusterMongoDBRoles : enableClusterMongoDBRoles ,
@@ -600,9 +610,9 @@ func (r *ReplicaSetReconcilerHelper) buildStatefulSetOptions(ctx context.Context
600610
601611// AddReplicaSetController creates a new MongoDbReplicaset Controller and adds it to the Manager. The Manager will set fields on the Controller
602612// and Start it when the Manager is Started.
603- func AddReplicaSetController (ctx context.Context , mgr manager.Manager , imageUrls images.ImageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion string , forceEnterprise bool , enableClusterMongoDBRoles bool ) error {
613+ func AddReplicaSetController (ctx context.Context , mgr manager.Manager , imageUrls images.ImageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion string , forceEnterprise bool , enableClusterMongoDBRoles bool , memberClustersMap map [ string ]cluster. Cluster ) error {
604614 // Create a new controller
605- reconciler := newReplicaSetReconciler (ctx , mgr .GetClient (), imageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion , forceEnterprise , enableClusterMongoDBRoles , om .NewOpsManagerConnection )
615+ reconciler := newReplicaSetReconciler (ctx , mgr .GetClient (), imageUrls , initDatabaseNonStaticImageVersion , databaseNonStaticImageVersion , forceEnterprise , enableClusterMongoDBRoles , multicluster . ClustersMapToClientMap ( memberClustersMap ), om .NewOpsManagerConnection )
606616 c , err := controller .New (util .MongoDbReplicaSetController , mgr , controller.Options {Reconciler : reconciler , MaxConcurrentReconciles : env .ReadIntOrDefault (util .MaxConcurrentReconcilesEnv , 1 )}) // nolint:forbidigo
607617 if err != nil {
608618 return err
@@ -672,11 +682,49 @@ func AddReplicaSetController(ctx context.Context, mgr manager.Manager, imageUrls
672682 return err
673683 }
674684
685+ if err := reconciler .configureMultiCluster (ctx , mgr , c , memberClustersMap ); err != nil {
686+ return xerrors .Errorf ("failed to configure replica set controller in multi cluster mode: %w" , err )
687+ }
688+
675689 zap .S ().Infof ("Registered controller %s" , util .MongoDbReplicaSetController )
676690
677691 return nil
678692}
679693
694+ func (r * ReconcileMongoDbReplicaSet ) configureMultiCluster (ctx context.Context , mgr manager.Manager , c controller.Controller , memberClustersMap map [string ]cluster.Cluster ) error {
695+ // TODO: Add cross-cluster StatefulSet watches for drift detection (like MongoDBMultiReplicaSet)
696+ // This will enable automatic reconciliation when users manually modify StatefulSets in member clusters, based on the MongoDBMultiResourceAnnotation annotation
697+ // for k, v := range memberClustersMap {
698+ // err := c.Watch(source.Kind[client.Object](v.GetCache(), &appsv1.StatefulSet{}, &khandler.EnqueueRequestForOwnerMultiCluster{}, watch.PredicatesForMultiStatefulSet()))
699+ // if err != nil {
700+ // return xerrors.Errorf("failed to set Watch on member cluster: %s, err: %w", k, err)
701+ // }
702+ // }
703+
704+ // TODO: Add member cluster health monitoring for automatic failover (like MongoDBMultiReplicaSet)
705+ // Need to:
706+ // - Start WatchMemberClusterHealth goroutine
707+ // - Watch event channel for health changes
708+ // - Modify memberwatch.WatchMemberClusterHealth to handle MongoDBReplicaSet (currently only handles MongoDBMultiCluster)
709+ //
710+ // eventChannel := make(chan event.GenericEvent)
711+ // memberClusterHealthChecker := memberwatch.MemberClusterHealthChecker{Cache: make(map[string]*memberwatch.MemberHeathCheck)}
712+ // go memberClusterHealthChecker.WatchMemberClusterHealth(ctx, zap.S(), eventChannel, r.client, memberClustersMap)
713+ // err := c.Watch(source.Channel[client.Object](eventChannel, &handler.EnqueueRequestForObject{}))
714+
715+ // TODO: Add ConfigMap watch for dynamic member list changes (like MongoDBMultiReplicaSet)
716+ // This enables runtime updates to which clusters are part of the deployment
717+ // err := c.Watch(source.Kind[client.Object](mgr.GetCache(), &corev1.ConfigMap{},
718+ // watch.ConfigMapEventHandler{
719+ // ConfigMapName: util.MemberListConfigMapName,
720+ // ConfigMapNamespace: env.ReadOrPanic(util.CurrentNamespace),
721+ // },
722+ // predicate.ResourceVersionChangedPredicate{},
723+ // ))
724+
725+ return nil
726+ }
727+
680728// updateOmDeploymentRs performs OM registration operation for the replicaset. So the changes will be finally propagated
681729// to automation agents in containers
682730func (r * ReplicaSetReconcilerHelper ) updateOmDeploymentRs (ctx context.Context , conn om.Connection , membersNumberBefore int , tlsCertPath , internalClusterCertPath string , deploymentOptions deploymentOptionsRS , shouldMirrorKeyfileForMongot bool , isRecovering bool ) workflow.Status {
@@ -752,6 +800,7 @@ func (r *ReplicaSetReconcilerHelper) updateOmDeploymentRs(ctx context.Context, c
752800 return workflow .Failed (err )
753801 }
754802
803+ // TODO: check if updateStatus usage is correct here
755804 if status := reconciler .ensureBackupConfigurationAndUpdateStatus (ctx , conn , rs , reconciler .SecretClient , log ); ! status .IsOK () && ! isRecovering {
756805 return status
757806 }
0 commit comments