diff --git a/controllers/om/automation_status.go b/controllers/om/automation_status.go index 502628cfa..15e54e6af 100644 --- a/controllers/om/automation_status.go +++ b/controllers/om/automation_status.go @@ -2,7 +2,9 @@ package om import ( "encoding/json" + "errors" "fmt" + "github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow" "maps" "slices" "sort" @@ -40,6 +42,14 @@ func buildAutomationStatusFromBytes(b []byte) (*AutomationStatus, error) { return as, nil } +type PendingErr struct { + msg string +} + +func (e PendingErr) Error() string { + return e.msg +} + // WaitForReadyState waits until the agents for relevant processes reach their state func WaitForReadyState(oc Connection, processNames []string, supressErrors bool, log *zap.SugaredLogger) error { if len(processNames) == 0 { @@ -72,6 +82,41 @@ func WaitForReadyState(oc Connection, processNames []string, supressErrors bool, return nil } +func CheckForReadyState(oc Connection, processNames []string, log *zap.SugaredLogger) workflow.Status { + err := CheckForReadyStateReturningError(oc, processNames, log) + + if err != nil { + pendingErr := PendingErr{} + if ok := errors.As(err, &pendingErr); ok { + return workflow.Pending(pendingErr.Error()) + } + + return workflow.Failed(err) + } + + return workflow.OK() +} + +func CheckForReadyStateReturningError(oc Connection, processNames []string, log *zap.SugaredLogger) error { + if len(processNames) == 0 { + log.Infow("Not checking for MongoDB agents to reach READY state (no expected processes to check)") + return nil + } + + log.Infow("Checking if MongoDB agents reached READY state...", "processes", processNames) + as, err := oc.ReadAutomationStatus() + if err != nil { + return xerrors.Errorf("Error reading Automation Agents status: %s", err) + } + + if allReachedGoalState, msg := checkAutomationStatusIsGoal(as, processNames, log); allReachedGoalState { + log.Info("MongoDB agents have reached READY state") + return nil + } else { + return PendingErr{fmt.Sprintf("MongoDB agents haven't reached READY state; %s", msg)} + } +} + // CheckAutomationStatusIsGoal returns true if all the relevant processes are in Goal // state. // Note, that the function is quite tolerant to any situations except for non-matching goal state, for example diff --git a/controllers/om/replicaset/om_replicaset.go b/controllers/om/replicaset/om_replicaset.go index 2e72d2c3e..b98a122da 100644 --- a/controllers/om/replicaset/om_replicaset.go +++ b/controllers/om/replicaset/om_replicaset.go @@ -58,7 +58,7 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri return xerrors.Errorf("unable to set votes, priority to 0 in Ops Manager, hosts: %v, err: %w", processes, err) } - if err := om.WaitForReadyState(omClient, processesToWaitForGoalState, false, log); err != nil { + if err := om.CheckForReadyStateReturningError(omClient, processesToWaitForGoalState, log); err != nil { return err } diff --git a/controllers/operator/appdbreplicaset_controller.go b/controllers/operator/appdbreplicaset_controller.go index fe68e07c6..b51607f5f 100644 --- a/controllers/operator/appdbreplicaset_controller.go +++ b/controllers/operator/appdbreplicaset_controller.go @@ -2,6 +2,7 @@ package operator import ( "context" + "errors" "fmt" "path" "sort" @@ -550,6 +551,10 @@ func (r *ReconcileAppDbReplicaSet) ReconcileAppDB(ctx context.Context, opsManage // it's possible that Ops Manager will not be available when we attempt to configure AppDB monitoring // in Ops Manager. This is not a blocker to continue with the rest of the reconciliation. if err != nil { + pendingErr := om.PendingErr{} + if ok := errors.As(err, &pendingErr); ok { + return r.updateStatus(ctx, opsManager, workflow.Pending(pendingErr.Error()), log, omStatusOption) + } log.Errorf("Unable to configure monitoring of AppDB: %s, configuration will be attempted next reconciliation.", err) if podVars.ProjectID != "" { diff --git a/controllers/operator/authentication/authentication.go b/controllers/operator/authentication/authentication.go index c2e36735b..3b11e4a1c 100644 --- a/controllers/operator/authentication/authentication.go +++ b/controllers/operator/authentication/authentication.go @@ -91,7 +91,7 @@ func Configure(conn om.Connection, opts Options, isRecovering bool, log *zap.Sug if isRecovering { return nil } - return om.WaitForReadyState(conn, opts.ProcessNames, false, log) + return om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log) } // we need to make sure the desired authentication mechanism for the agent exists. If the desired agent @@ -172,6 +172,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare return xerrors.Errorf("error read/updating automation config: %w", err) } + // Disable is called also onDelete, so we cannot requeue here, we must wait if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil { return xerrors.Errorf("error waiting for ready state: %w", err) } @@ -222,7 +223,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare return xerrors.Errorf("error read/updating backup agent config: %w", err) } - if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil { + if err := om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log); err != nil { return xerrors.Errorf("error waiting for ready state: %w", err) } diff --git a/controllers/operator/common_controller.go b/controllers/operator/common_controller.go index e76cf4e81..43594d0df 100644 --- a/controllers/operator/common_controller.go +++ b/controllers/operator/common_controller.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "encoding/pem" + "errors" "fmt" "path/filepath" "reflect" @@ -427,9 +428,12 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context, return workflow.Failed(err), false } - // we need to wait for all agents to be ready before configuring any authentication settings - if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil { - return workflow.Failed(err), false + if !isRecovering { + if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() { + return workflowStatus, false + } + } else { + log.Warnf("Ignoring checking for ready state due to recovering") } clientCerts := util.OptionalClientCertficates @@ -515,6 +519,10 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context, } if err := authentication.Configure(conn, authOpts, isRecovering, log); err != nil { + pendingErr := om.PendingErr{} + if ok := errors.As(err, &pendingErr); ok { + return workflow.Pending(pendingErr.Error()), false + } return workflow.Failed(err), false } } else if wantToEnableAuthentication { @@ -534,6 +542,7 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context, authOpts.UserOptions = userOpts if err := authentication.Disable(conn, authOpts, false, log); err != nil { + return workflow.Failed(err), false } } diff --git a/controllers/operator/mongodbmultireplicaset_controller.go b/controllers/operator/mongodbmultireplicaset_controller.go index 5d963c00e..c42376035 100644 --- a/controllers/operator/mongodbmultireplicaset_controller.go +++ b/controllers/operator/mongodbmultireplicaset_controller.go @@ -3,6 +3,7 @@ package operator import ( "context" "encoding/json" + "errors" "fmt" "reflect" "sort" @@ -213,6 +214,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request status := workflow.RunInGivenOrder(publishAutomationConfigFirst, func() workflow.Status { if err := r.updateOmDeploymentRs(ctx, conn, mrs, agentCertPath, tlsCertPath, internalClusterCertPath, false, log); err != nil { + pendingErr := om.PendingErr{} + if ok := errors.As(err, &pendingErr); ok { + return workflow.Pending(pendingErr.Error()) + } return workflow.Failed(err) } return workflow.OK() @@ -789,9 +794,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte reachableProcessNames = append(reachableProcessNames, proc.Name()) } } - if err := om.WaitForReadyState(conn, reachableProcessNames, isRecovering, log); err != nil && !isRecovering { + if isRecovering { + return nil + } + + if err := om.CheckForReadyStateReturningError(conn, reachableProcessNames, log); err != nil { return err } + return nil } diff --git a/controllers/operator/mongodbreplicaset_controller.go b/controllers/operator/mongodbreplicaset_controller.go index 470e56716..cc6111a1f 100644 --- a/controllers/operator/mongodbreplicaset_controller.go +++ b/controllers/operator/mongodbreplicaset_controller.go @@ -2,6 +2,7 @@ package operator import ( "context" + goerrors "errors" "fmt" "go.uber.org/zap" @@ -240,6 +241,10 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco if scale.ReplicasThisReconciliation(rs) < rs.Status.Members { if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil { + pendingErr := om.PendingErr{} + if ok := goerrors.As(err, &pendingErr); ok { + return r.updateStatus(ctx, rs, workflow.Pending(pendingErr.Error()), log) + } return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log) } } @@ -512,8 +517,12 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c return workflow.Failed(err) } - if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil { - return workflow.Failed(err) + if !isRecovering { + if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() { + return workflowStatus + } + } else { + log.Warnf("Ignoring checking for ready state due to recovering") } reconcileResult, _ := ReconcileLogRotateSetting(conn, rs.Spec.Agent, log) diff --git a/controllers/operator/mongodbshardedcluster_controller.go b/controllers/operator/mongodbshardedcluster_controller.go index abe384b16..c9ca7880f 100644 --- a/controllers/operator/mongodbshardedcluster_controller.go +++ b/controllers/operator/mongodbshardedcluster_controller.go @@ -2,7 +2,9 @@ package operator import ( "context" + goerrors "errors" "fmt" + "k8s.io/apimachinery/pkg/api/errors" "slices" "sort" "strings" @@ -11,7 +13,6 @@ import ( "github.com/hashicorp/go-multierror" "go.uber.org/zap" "golang.org/x/xerrors" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" @@ -1611,6 +1612,7 @@ func (r *ShardedClusterReconcileHelper) cleanOpsManagerState(ctx context.Context } logDiffOfProcessNames(processNames, r.getHealthyProcessNames(), log.With("ctx", "cleanOpsManagerState")) + // we're onDelete, we cannot requeue, so we need to wait if err := om.WaitForReadyState(conn, r.getHealthyProcessNames(), false, log); err != nil { return err } @@ -1849,13 +1851,12 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log) logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "updateOmDeploymentShardedCluster")) - if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil { - if !isRecovering { - if shardsRemoving { - return workflow.Pending("automation agents haven't reached READY state: shards removal in progress: %v", err) - } - return workflow.Failed(err) + + if !isRecovering { + if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); !workflowStatus.IsOK() { + return workflowStatus } + } else { logWarnIgnoredDueToRecovery(log, err) } @@ -1873,12 +1874,16 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log) logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "shardsRemoving")) - if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil { - if !isRecovering { - return workflow.Failed(xerrors.Errorf("automation agents haven't reached READY state while cleaning replica set and processes: %w", err)) - } + if isRecovering { logWarnIgnoredDueToRecovery(log, err) } + if err = om.CheckForReadyStateReturningError(conn, healthyProcessesToWaitForReadyState, log); err != nil { + pendingErr := om.PendingErr{} + if ok := goerrors.As(err, &pendingErr); ok { + return workflow.Pending(pendingErr.Error()) + } + return workflow.Failed(err) + } } currentHosts := r.getAllHostnames(false) @@ -2042,8 +2047,13 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c healthyProcessesToWaitForReadyState = r.getHealthyProcessNamesToWaitForReadyState(conn, log) logDiffOfProcessNames(opts.processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "publishDeployment")) - if err := om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil { - return nil, shardsRemoving, workflow.Failed(err) + + if !isRecovering { + if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); workflowStatus != workflow.OK() { + return nil, shardsRemoving, workflowStatus + } + } else { + log.Warnf("Ignoring checking for ready state due to recovering") } if additionalReconciliationRequired { diff --git a/controllers/operator/mongodbstandalone_controller.go b/controllers/operator/mongodbstandalone_controller.go index 47ff489bb..ad5b20c13 100644 --- a/controllers/operator/mongodbstandalone_controller.go +++ b/controllers/operator/mongodbstandalone_controller.go @@ -353,8 +353,12 @@ func (r *ReconcileMongoDbStandalone) updateOmDeployment(ctx context.Context, con return workflow.Failed(err) } - if err := om.WaitForReadyState(conn, []string{set.Name}, isRecovering, log); err != nil { - return workflow.Failed(err) + if !isRecovering { + if workflowStatus := om.CheckForReadyState(conn, []string{set.Name}, log); status != workflow.OK() { + return workflowStatus + } + } else { + log.Warnf("Ignoring checking for ready state due to recovering") } if additionalReconciliationRequired { diff --git a/docker/mongodb-kubernetes-tests/tests/search/search_enterprise_tls.py b/docker/mongodb-kubernetes-tests/tests/search/search_enterprise_tls.py index 72aa65e10..0ecb1a961 100644 --- a/docker/mongodb-kubernetes-tests/tests/search/search_enterprise_tls.py +++ b/docker/mongodb-kubernetes-tests/tests/search/search_enterprise_tls.py @@ -187,11 +187,6 @@ def test_create_search_resource(mdbs: MongoDBSearch): mdbs.assert_reaches_phase(Phase.Running, timeout=300) -@mark.e2e_search_enterprise_tls -def test_wait_for_database_resource_ready(mdb: MongoDB): - mdb.assert_reaches_phase(Phase.Running, timeout=300) - - @mark.e2e_search_enterprise_tls def test_wait_for_mongod_parameters(mdb: MongoDB): # After search CR is deployed, MongoDB controller will pick it up @@ -216,6 +211,13 @@ def check_mongod_parameters(): run_periodically(check_mongod_parameters, timeout=200) +# After picking up MongoDBSearch CR, MongoDB reconciler will add mongod parameters. +# But it will not immediately mark the MongoDB CR as Pending +# spinning +@mark.e2e_search_enterprise_tls +def test_wait_for_database_resource_ready(mdb: MongoDB): + mdb.assert_reaches_phase(Phase.Running, timeout=300) + @mark.e2e_search_enterprise_tls def test_validate_tls_connections(mdb: MongoDB, mdbs: MongoDBSearch, namespace: str): @@ -244,11 +246,6 @@ def test_search_assert_search_query(mdb: MongoDB): # after mongodb is upgraded, the role should be removed from AC # From 8.2 searchCoordinator role is a built-in role. class TestUpgradeMongod: - def test_check_polyfilled_role_in_ac(self, mdb: MongoDB): - custom_roles = mdb.get_automation_config_tester().automation_config.get("roles", []) - assert len(custom_roles) > 0 - assert "searchCoordinator" in [role["role"] for role in custom_roles] - def test_mongod_version(self, mdb: MongoDB): # This test is redundant when looking at the context of the full test file, # as we deploy MDB_VERSION_WITHOUT_BUILT_IN_ROLE initially @@ -258,6 +255,11 @@ def test_mongod_version(self, mdb: MongoDB): # or executed again when running locally. mdb.tester(ca_path=get_issuer_ca_filepath(), use_ssl=True).assert_version(MDB_VERSION_WITHOUT_BUILT_IN_ROLE) + def test_check_polyfilled_role_in_ac(self, mdb: MongoDB): + custom_roles = mdb.get_automation_config_tester().automation_config.get("roles", []) + assert len(custom_roles) > 0 + assert "searchCoordinator" in [role["role"] for role in custom_roles] + def test_upgrade_to_mongo_8_2(self, mdb: MongoDB): mdb.set_version(MDB_VERSION_WITH_BUILT_IN_ROLE) mdb.update()