Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions controllers/om/automation_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package om

import (
"encoding/json"
"errors"
"fmt"
"github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow"
"maps"
"slices"
"sort"
Expand Down Expand Up @@ -40,6 +42,14 @@ func buildAutomationStatusFromBytes(b []byte) (*AutomationStatus, error) {
return as, nil
}

type PendingErr struct {
msg string
}

func (e PendingErr) Error() string {
return e.msg
}

// WaitForReadyState waits until the agents for relevant processes reach their state
func WaitForReadyState(oc Connection, processNames []string, supressErrors bool, log *zap.SugaredLogger) error {
if len(processNames) == 0 {
Expand Down Expand Up @@ -72,6 +82,41 @@ func WaitForReadyState(oc Connection, processNames []string, supressErrors bool,
return nil
}

func CheckForReadyState(oc Connection, processNames []string, log *zap.SugaredLogger) workflow.Status {
err := CheckForReadyStateReturningError(oc, processNames, log)

if err != nil {
pendingErr := PendingErr{}
if ok := errors.As(err, &pendingErr); ok {
return workflow.Pending(pendingErr.Error())
}

return workflow.Failed(err)
}

return workflow.OK()
}

func CheckForReadyStateReturningError(oc Connection, processNames []string, log *zap.SugaredLogger) error {
if len(processNames) == 0 {
log.Infow("Not checking for MongoDB agents to reach READY state (no expected processes to check)")
return nil
}

log.Infow("Checking if MongoDB agents reached READY state...", "processes", processNames)
as, err := oc.ReadAutomationStatus()
if err != nil {
return xerrors.Errorf("Error reading Automation Agents status: %s", err)
}

if allReachedGoalState, msg := checkAutomationStatusIsGoal(as, processNames, log); allReachedGoalState {
log.Info("MongoDB agents have reached READY state")
return nil
} else {
return PendingErr{fmt.Sprintf("MongoDB agents haven't reached READY state; %s", msg)}
}
}

// CheckAutomationStatusIsGoal returns true if all the relevant processes are in Goal
// state.
// Note, that the function is quite tolerant to any situations except for non-matching goal state, for example
Expand Down
2 changes: 1 addition & 1 deletion controllers/om/replicaset/om_replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri
return xerrors.Errorf("unable to set votes, priority to 0 in Ops Manager, hosts: %v, err: %w", processes, err)
}

if err := om.WaitForReadyState(omClient, processesToWaitForGoalState, false, log); err != nil {
if err := om.CheckForReadyStateReturningError(omClient, processesToWaitForGoalState, log); err != nil {
return err
}

Expand Down
5 changes: 5 additions & 0 deletions controllers/operator/appdbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operator

import (
"context"
"errors"
"fmt"
"path"
"sort"
Expand Down Expand Up @@ -550,6 +551,10 @@ func (r *ReconcileAppDbReplicaSet) ReconcileAppDB(ctx context.Context, opsManage
// it's possible that Ops Manager will not be available when we attempt to configure AppDB monitoring
// in Ops Manager. This is not a blocker to continue with the rest of the reconciliation.
if err != nil {
pendingErr := om.PendingErr{}
if ok := errors.As(err, &pendingErr); ok {
return r.updateStatus(ctx, opsManager, workflow.Pending(pendingErr.Error()), log, omStatusOption)
}
log.Errorf("Unable to configure monitoring of AppDB: %s, configuration will be attempted next reconciliation.", err)

if podVars.ProjectID != "" {
Expand Down
5 changes: 3 additions & 2 deletions controllers/operator/authentication/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func Configure(conn om.Connection, opts Options, isRecovering bool, log *zap.Sug
if isRecovering {
return nil
}
return om.WaitForReadyState(conn, opts.ProcessNames, false, log)
return om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log)
}

// we need to make sure the desired authentication mechanism for the agent exists. If the desired agent
Expand Down Expand Up @@ -172,6 +172,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare
return xerrors.Errorf("error read/updating automation config: %w", err)
}

// Disable is called also onDelete, so we cannot requeue here, we must wait
if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil {
return xerrors.Errorf("error waiting for ready state: %w", err)
}
Expand Down Expand Up @@ -222,7 +223,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare
return xerrors.Errorf("error read/updating backup agent config: %w", err)
}

if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil {
if err := om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log); err != nil {
return xerrors.Errorf("error waiting for ready state: %w", err)
}

Expand Down
15 changes: 12 additions & 3 deletions controllers/operator/common_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -427,9 +428,12 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,
return workflow.Failed(err), false
}

// we need to wait for all agents to be ready before configuring any authentication settings
if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil {
return workflow.Failed(err), false
if !isRecovering {
if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() {
return workflowStatus, false
}
} else {
log.Warnf("Ignoring checking for ready state due to recovering")
}

clientCerts := util.OptionalClientCertficates
Expand Down Expand Up @@ -515,6 +519,10 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,
}

if err := authentication.Configure(conn, authOpts, isRecovering, log); err != nil {
pendingErr := om.PendingErr{}
if ok := errors.As(err, &pendingErr); ok {
return workflow.Pending(pendingErr.Error()), false
}
return workflow.Failed(err), false
}
} else if wantToEnableAuthentication {
Expand All @@ -534,6 +542,7 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,

authOpts.UserOptions = userOpts
if err := authentication.Disable(conn, authOpts, false, log); err != nil {

return workflow.Failed(err), false
}
}
Expand Down
12 changes: 11 additions & 1 deletion controllers/operator/mongodbmultireplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package operator
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -213,6 +214,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
status := workflow.RunInGivenOrder(publishAutomationConfigFirst,
func() workflow.Status {
if err := r.updateOmDeploymentRs(ctx, conn, mrs, agentCertPath, tlsCertPath, internalClusterCertPath, false, log); err != nil {
pendingErr := om.PendingErr{}
if ok := errors.As(err, &pendingErr); ok {
return workflow.Pending(pendingErr.Error())
}
return workflow.Failed(err)
}
return workflow.OK()
Expand Down Expand Up @@ -789,9 +794,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
reachableProcessNames = append(reachableProcessNames, proc.Name())
}
}
if err := om.WaitForReadyState(conn, reachableProcessNames, isRecovering, log); err != nil && !isRecovering {
if isRecovering {
return nil
}

if err := om.CheckForReadyStateReturningError(conn, reachableProcessNames, log); err != nil {
return err
}

return nil
}

Expand Down
13 changes: 11 additions & 2 deletions controllers/operator/mongodbreplicaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package operator

import (
"context"
goerrors "errors"
"fmt"

"go.uber.org/zap"
Expand Down Expand Up @@ -240,6 +241,10 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco

if scale.ReplicasThisReconciliation(rs) < rs.Status.Members {
if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil {
pendingErr := om.PendingErr{}
if ok := goerrors.As(err, &pendingErr); ok {
return r.updateStatus(ctx, rs, workflow.Pending(pendingErr.Error()), log)
}
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log)
}
}
Expand Down Expand Up @@ -512,8 +517,12 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c
return workflow.Failed(err)
}

if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil {
return workflow.Failed(err)
if !isRecovering {
if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() {
return workflowStatus
}
} else {
log.Warnf("Ignoring checking for ready state due to recovering")
}

reconcileResult, _ := ReconcileLogRotateSetting(conn, rs.Spec.Agent, log)
Expand Down
36 changes: 23 additions & 13 deletions controllers/operator/mongodbshardedcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package operator

import (
"context"
goerrors "errors"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"slices"
"sort"
"strings"
Expand All @@ -11,7 +13,6 @@ import (
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
"golang.org/x/xerrors"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -1611,6 +1612,7 @@ func (r *ShardedClusterReconcileHelper) cleanOpsManagerState(ctx context.Context
}

logDiffOfProcessNames(processNames, r.getHealthyProcessNames(), log.With("ctx", "cleanOpsManagerState"))
// we're onDelete, we cannot requeue, so we need to wait
if err := om.WaitForReadyState(conn, r.getHealthyProcessNames(), false, log); err != nil {
return err
}
Expand Down Expand Up @@ -1849,13 +1851,12 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con

healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log)
logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "updateOmDeploymentShardedCluster"))
if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
if !isRecovering {
if shardsRemoving {
return workflow.Pending("automation agents haven't reached READY state: shards removal in progress: %v", err)
}
return workflow.Failed(err)

if !isRecovering {
if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); !workflowStatus.IsOK() {
return workflowStatus
}
} else {
logWarnIgnoredDueToRecovery(log, err)
}

Expand All @@ -1873,12 +1874,16 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con

healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log)
logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "shardsRemoving"))
if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
if !isRecovering {
return workflow.Failed(xerrors.Errorf("automation agents haven't reached READY state while cleaning replica set and processes: %w", err))
}
if isRecovering {
logWarnIgnoredDueToRecovery(log, err)
}
if err = om.CheckForReadyStateReturningError(conn, healthyProcessesToWaitForReadyState, log); err != nil {
pendingErr := om.PendingErr{}
if ok := goerrors.As(err, &pendingErr); ok {
return workflow.Pending(pendingErr.Error())
}
return workflow.Failed(err)
}
}

currentHosts := r.getAllHostnames(false)
Expand Down Expand Up @@ -2042,8 +2047,13 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c

healthyProcessesToWaitForReadyState = r.getHealthyProcessNamesToWaitForReadyState(conn, log)
logDiffOfProcessNames(opts.processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "publishDeployment"))
if err := om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
return nil, shardsRemoving, workflow.Failed(err)

if !isRecovering {
if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); workflowStatus != workflow.OK() {
return nil, shardsRemoving, workflowStatus
}
} else {
log.Warnf("Ignoring checking for ready state due to recovering")
}

if additionalReconciliationRequired {
Expand Down
8 changes: 6 additions & 2 deletions controllers/operator/mongodbstandalone_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,12 @@ func (r *ReconcileMongoDbStandalone) updateOmDeployment(ctx context.Context, con
return workflow.Failed(err)
}

if err := om.WaitForReadyState(conn, []string{set.Name}, isRecovering, log); err != nil {
return workflow.Failed(err)
if !isRecovering {
if workflowStatus := om.CheckForReadyState(conn, []string{set.Name}, log); status != workflow.OK() {
return workflowStatus
}
} else {
log.Warnf("Ignoring checking for ready state due to recovering")
}

if additionalReconciliationRequired {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ def test_create_search_resource(mdbs: MongoDBSearch):
mdbs.assert_reaches_phase(Phase.Running, timeout=300)


@mark.e2e_search_enterprise_tls
def test_wait_for_database_resource_ready(mdb: MongoDB):
mdb.assert_reaches_phase(Phase.Running, timeout=300)


@mark.e2e_search_enterprise_tls
def test_wait_for_mongod_parameters(mdb: MongoDB):
# After search CR is deployed, MongoDB controller will pick it up
Expand All @@ -216,6 +211,13 @@ def check_mongod_parameters():

run_periodically(check_mongod_parameters, timeout=200)

# After picking up MongoDBSearch CR, MongoDB reconciler will add mongod parameters.
# But it will not immediately mark the MongoDB CR as Pending
# spinning
@mark.e2e_search_enterprise_tls
def test_wait_for_database_resource_ready(mdb: MongoDB):
mdb.assert_reaches_phase(Phase.Running, timeout=300)


@mark.e2e_search_enterprise_tls
def test_validate_tls_connections(mdb: MongoDB, mdbs: MongoDBSearch, namespace: str):
Expand Down Expand Up @@ -244,11 +246,6 @@ def test_search_assert_search_query(mdb: MongoDB):
# after mongodb is upgraded, the role should be removed from AC
# From 8.2 searchCoordinator role is a built-in role.
class TestUpgradeMongod:
def test_check_polyfilled_role_in_ac(self, mdb: MongoDB):
custom_roles = mdb.get_automation_config_tester().automation_config.get("roles", [])
assert len(custom_roles) > 0
assert "searchCoordinator" in [role["role"] for role in custom_roles]

def test_mongod_version(self, mdb: MongoDB):
# This test is redundant when looking at the context of the full test file,
# as we deploy MDB_VERSION_WITHOUT_BUILT_IN_ROLE initially
Expand All @@ -258,6 +255,11 @@ def test_mongod_version(self, mdb: MongoDB):
# or executed again when running locally.
mdb.tester(ca_path=get_issuer_ca_filepath(), use_ssl=True).assert_version(MDB_VERSION_WITHOUT_BUILT_IN_ROLE)

def test_check_polyfilled_role_in_ac(self, mdb: MongoDB):
custom_roles = mdb.get_automation_config_tester().automation_config.get("roles", [])
assert len(custom_roles) > 0
assert "searchCoordinator" in [role["role"] for role in custom_roles]

def test_upgrade_to_mongo_8_2(self, mdb: MongoDB):
mdb.set_version(MDB_VERSION_WITH_BUILT_IN_ROLE)
mdb.update()
Expand Down