Skip to content

Commit b7d6023

Browse files
committed
Check wait for the goal state without blocking
1 parent 853a47f commit b7d6023

10 files changed

+124
-31
lines changed

controllers/om/automation_status.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package om
22

33
import (
44
"encoding/json"
5+
"errors"
56
"fmt"
7+
"github.com/mongodb/mongodb-kubernetes/controllers/operator/workflow"
68
"maps"
79
"slices"
810
"sort"
@@ -40,6 +42,14 @@ func buildAutomationStatusFromBytes(b []byte) (*AutomationStatus, error) {
4042
return as, nil
4143
}
4244

45+
type PendingErr struct {
46+
msg string
47+
}
48+
49+
func (e PendingErr) Error() string {
50+
return e.msg
51+
}
52+
4353
// WaitForReadyState waits until the agents for relevant processes reach their state
4454
func WaitForReadyState(oc Connection, processNames []string, supressErrors bool, log *zap.SugaredLogger) error {
4555
if len(processNames) == 0 {
@@ -72,6 +82,41 @@ func WaitForReadyState(oc Connection, processNames []string, supressErrors bool,
7282
return nil
7383
}
7484

85+
func CheckForReadyState(oc Connection, processNames []string, log *zap.SugaredLogger) workflow.Status {
86+
err := CheckForReadyStateReturningError(oc, processNames, log)
87+
88+
if err != nil {
89+
pendingErr := PendingErr{}
90+
if ok := errors.As(err, &pendingErr); ok {
91+
return workflow.Pending(pendingErr.Error())
92+
}
93+
94+
return workflow.Failed(err)
95+
}
96+
97+
return workflow.OK()
98+
}
99+
100+
func CheckForReadyStateReturningError(oc Connection, processNames []string, log *zap.SugaredLogger) error {
101+
if len(processNames) == 0 {
102+
log.Infow("Not checking for MongoDB agents to reach READY state (no expected processes to check)")
103+
return nil
104+
}
105+
106+
log.Infow("Checking if MongoDB agents reached READY state...", "processes", processNames)
107+
as, err := oc.ReadAutomationStatus()
108+
if err != nil {
109+
return xerrors.Errorf("Error reading Automation Agents status: %s", err)
110+
}
111+
112+
if allReachedGoalState, msg := checkAutomationStatusIsGoal(as, processNames, log); allReachedGoalState {
113+
log.Info("MongoDB agents have reached READY state")
114+
return nil
115+
} else {
116+
return PendingErr{fmt.Sprintf("MongoDB agents haven't reached READY state; %s", msg)}
117+
}
118+
}
119+
75120
// CheckAutomationStatusIsGoal returns true if all the relevant processes are in Goal
76121
// state.
77122
// Note, that the function is quite tolerant to any situations except for non-matching goal state, for example

controllers/om/replicaset/om_replicaset.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri
5858
return xerrors.Errorf("unable to set votes, priority to 0 in Ops Manager, hosts: %v, err: %w", processes, err)
5959
}
6060

61-
if err := om.WaitForReadyState(omClient, processesToWaitForGoalState, false, log); err != nil {
61+
if err := om.CheckForReadyStateReturningError(omClient, processesToWaitForGoalState, log); err != nil {
6262
return err
6363
}
6464

controllers/operator/appdbreplicaset_controller.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package operator
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"path"
78
"sort"
@@ -550,6 +551,10 @@ func (r *ReconcileAppDbReplicaSet) ReconcileAppDB(ctx context.Context, opsManage
550551
// it's possible that Ops Manager will not be available when we attempt to configure AppDB monitoring
551552
// in Ops Manager. This is not a blocker to continue with the rest of the reconciliation.
552553
if err != nil {
554+
pendingErr := om.PendingErr{}
555+
if ok := errors.As(err, &pendingErr); ok {
556+
return r.updateStatus(ctx, opsManager, workflow.Pending(pendingErr.Error()), log, omStatusOption)
557+
}
553558
log.Errorf("Unable to configure monitoring of AppDB: %s, configuration will be attempted next reconciliation.", err)
554559

555560
if podVars.ProjectID != "" {

controllers/operator/authentication/authentication.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func Configure(conn om.Connection, opts Options, isRecovering bool, log *zap.Sug
9191
if isRecovering {
9292
return nil
9393
}
94-
return om.WaitForReadyState(conn, opts.ProcessNames, false, log)
94+
return om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log)
9595
}
9696

9797
// we need to make sure the desired authentication mechanism for the agent exists. If the desired agent
@@ -172,6 +172,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare
172172
return xerrors.Errorf("error read/updating automation config: %w", err)
173173
}
174174

175+
// Disable is called also onDelete, so we cannot requeue here, we must wait
175176
if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil {
176177
return xerrors.Errorf("error waiting for ready state: %w", err)
177178
}
@@ -222,7 +223,7 @@ func Disable(conn om.Connection, opts Options, deleteUsers bool, log *zap.Sugare
222223
return xerrors.Errorf("error read/updating backup agent config: %w", err)
223224
}
224225

225-
if err := om.WaitForReadyState(conn, opts.ProcessNames, false, log); err != nil {
226+
if err := om.CheckForReadyStateReturningError(conn, opts.ProcessNames, log); err != nil {
226227
return xerrors.Errorf("error waiting for ready state: %w", err)
227228
}
228229

controllers/operator/common_controller.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"encoding/pem"
7+
"errors"
78
"fmt"
89
"path/filepath"
910
"reflect"
@@ -427,9 +428,12 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,
427428
return workflow.Failed(err), false
428429
}
429430

430-
// we need to wait for all agents to be ready before configuring any authentication settings
431-
if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil {
432-
return workflow.Failed(err), false
431+
if !isRecovering {
432+
if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() {
433+
return workflowStatus, false
434+
}
435+
} else {
436+
log.Warnf("Ignoring checking for ready state due to recovering")
433437
}
434438

435439
clientCerts := util.OptionalClientCertficates
@@ -515,6 +519,10 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,
515519
}
516520

517521
if err := authentication.Configure(conn, authOpts, isRecovering, log); err != nil {
522+
pendingErr := om.PendingErr{}
523+
if ok := errors.As(err, &pendingErr); ok {
524+
return workflow.Pending(pendingErr.Error()), false
525+
}
518526
return workflow.Failed(err), false
519527
}
520528
} else if wantToEnableAuthentication {
@@ -534,6 +542,7 @@ func (r *ReconcileCommonController) updateOmAuthentication(ctx context.Context,
534542

535543
authOpts.UserOptions = userOpts
536544
if err := authentication.Disable(conn, authOpts, false, log); err != nil {
545+
537546
return workflow.Failed(err), false
538547
}
539548
}

controllers/operator/mongodbmultireplicaset_controller.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package operator
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"reflect"
89
"sort"
@@ -213,6 +214,10 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
213214
status := workflow.RunInGivenOrder(publishAutomationConfigFirst,
214215
func() workflow.Status {
215216
if err := r.updateOmDeploymentRs(ctx, conn, mrs, agentCertPath, tlsCertPath, internalClusterCertPath, false, log); err != nil {
217+
pendingErr := om.PendingErr{}
218+
if ok := errors.As(err, &pendingErr); ok {
219+
return workflow.Pending(pendingErr.Error())
220+
}
216221
return workflow.Failed(err)
217222
}
218223
return workflow.OK()
@@ -789,9 +794,14 @@ func (r *ReconcileMongoDbMultiReplicaSet) updateOmDeploymentRs(ctx context.Conte
789794
reachableProcessNames = append(reachableProcessNames, proc.Name())
790795
}
791796
}
792-
if err := om.WaitForReadyState(conn, reachableProcessNames, isRecovering, log); err != nil && !isRecovering {
797+
if isRecovering {
798+
return nil
799+
}
800+
801+
if err := om.CheckForReadyStateReturningError(conn, reachableProcessNames, log); err != nil {
793802
return err
794803
}
804+
795805
return nil
796806
}
797807

controllers/operator/mongodbreplicaset_controller.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package operator
22

33
import (
44
"context"
5+
goerrors "errors"
56
"fmt"
67

78
"go.uber.org/zap"
@@ -240,6 +241,10 @@ func (r *ReconcileMongoDbReplicaSet) Reconcile(ctx context.Context, request reco
240241

241242
if scale.ReplicasThisReconciliation(rs) < rs.Status.Members {
242243
if err := replicaset.PrepareScaleDownFromStatefulSet(conn, sts, rs, log); err != nil {
244+
pendingErr := om.PendingErr{}
245+
if ok := goerrors.As(err, &pendingErr); ok {
246+
return r.updateStatus(ctx, rs, workflow.Pending(pendingErr.Error()), log)
247+
}
243248
return r.updateStatus(ctx, rs, workflow.Failed(xerrors.Errorf("Failed to prepare Replica Set for scaling down using Ops Manager: %w", err)), log)
244249
}
245250
}
@@ -512,8 +517,12 @@ func (r *ReconcileMongoDbReplicaSet) updateOmDeploymentRs(ctx context.Context, c
512517
return workflow.Failed(err)
513518
}
514519

515-
if err := om.WaitForReadyState(conn, processNames, isRecovering, log); err != nil {
516-
return workflow.Failed(err)
520+
if !isRecovering {
521+
if workflowStatus := om.CheckForReadyState(conn, processNames, log); !workflowStatus.IsOK() {
522+
return workflowStatus
523+
}
524+
} else {
525+
log.Warnf("Ignoring checking for ready state due to recovering")
517526
}
518527

519528
reconcileResult, _ := ReconcileLogRotateSetting(conn, rs.Spec.Agent, log)

controllers/operator/mongodbshardedcluster_controller.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package operator
22

33
import (
44
"context"
5+
goerrors "errors"
56
"fmt"
7+
"k8s.io/apimachinery/pkg/api/errors"
68
"slices"
79
"sort"
810
"strings"
@@ -11,7 +13,6 @@ import (
1113
"github.com/hashicorp/go-multierror"
1214
"go.uber.org/zap"
1315
"golang.org/x/xerrors"
14-
"k8s.io/apimachinery/pkg/api/errors"
1516
"k8s.io/apimachinery/pkg/runtime"
1617
"k8s.io/apimachinery/pkg/util/intstr"
1718
"k8s.io/utils/ptr"
@@ -1611,6 +1612,7 @@ func (r *ShardedClusterReconcileHelper) cleanOpsManagerState(ctx context.Context
16111612
}
16121613

16131614
logDiffOfProcessNames(processNames, r.getHealthyProcessNames(), log.With("ctx", "cleanOpsManagerState"))
1615+
// we're onDelete, we cannot requeue, so we need to wait
16141616
if err := om.WaitForReadyState(conn, r.getHealthyProcessNames(), false, log); err != nil {
16151617
return err
16161618
}
@@ -1849,13 +1851,12 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con
18491851

18501852
healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log)
18511853
logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "updateOmDeploymentShardedCluster"))
1852-
if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
1853-
if !isRecovering {
1854-
if shardsRemoving {
1855-
return workflow.Pending("automation agents haven't reached READY state: shards removal in progress: %v", err)
1856-
}
1857-
return workflow.Failed(err)
1854+
1855+
if !isRecovering {
1856+
if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); !workflowStatus.IsOK() {
1857+
return workflowStatus
18581858
}
1859+
} else {
18591860
logWarnIgnoredDueToRecovery(log, err)
18601861
}
18611862

@@ -1873,12 +1874,16 @@ func (r *ShardedClusterReconcileHelper) updateOmDeploymentShardedCluster(ctx con
18731874

18741875
healthyProcessesToWaitForReadyState := r.getHealthyProcessNamesToWaitForReadyState(conn, log)
18751876
logDiffOfProcessNames(processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "shardsRemoving"))
1876-
if err = om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
1877-
if !isRecovering {
1878-
return workflow.Failed(xerrors.Errorf("automation agents haven't reached READY state while cleaning replica set and processes: %w", err))
1879-
}
1877+
if isRecovering {
18801878
logWarnIgnoredDueToRecovery(log, err)
18811879
}
1880+
if err = om.CheckForReadyStateReturningError(conn, healthyProcessesToWaitForReadyState, log); err != nil {
1881+
pendingErr := om.PendingErr{}
1882+
if ok := goerrors.As(err, &pendingErr); ok {
1883+
return workflow.Pending(pendingErr.Error())
1884+
}
1885+
return workflow.Failed(err)
1886+
}
18821887
}
18831888

18841889
currentHosts := r.getAllHostnames(false)
@@ -2042,8 +2047,13 @@ func (r *ShardedClusterReconcileHelper) publishDeployment(ctx context.Context, c
20422047

20432048
healthyProcessesToWaitForReadyState = r.getHealthyProcessNamesToWaitForReadyState(conn, log)
20442049
logDiffOfProcessNames(opts.processNames, healthyProcessesToWaitForReadyState, log.With("ctx", "publishDeployment"))
2045-
if err := om.WaitForReadyState(conn, healthyProcessesToWaitForReadyState, isRecovering, log); err != nil {
2046-
return nil, shardsRemoving, workflow.Failed(err)
2050+
2051+
if !isRecovering {
2052+
if workflowStatus := om.CheckForReadyState(conn, healthyProcessesToWaitForReadyState, log); workflowStatus != workflow.OK() {
2053+
return nil, shardsRemoving, workflowStatus
2054+
}
2055+
} else {
2056+
log.Warnf("Ignoring checking for ready state due to recovering")
20472057
}
20482058

20492059
if additionalReconciliationRequired {

controllers/operator/mongodbstandalone_controller.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,12 @@ func (r *ReconcileMongoDbStandalone) updateOmDeployment(ctx context.Context, con
353353
return workflow.Failed(err)
354354
}
355355

356-
if err := om.WaitForReadyState(conn, []string{set.Name}, isRecovering, log); err != nil {
357-
return workflow.Failed(err)
356+
if !isRecovering {
357+
if workflowStatus := om.CheckForReadyState(conn, []string{set.Name}, log); status != workflow.OK() {
358+
return workflowStatus
359+
}
360+
} else {
361+
log.Warnf("Ignoring checking for ready state due to recovering")
358362
}
359363

360364
if additionalReconciliationRequired {

controllers/operator/mongodbuser_controller.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,8 @@ func (r *MongoDBUserReconciler) handleScramShaUser(ctx context.Context, user *us
403403
// Before we update the MongoDBUser's status to Updated,
404404
// we need to wait for the cluster to be in a ready state
405405
// to ensure that the user has been created successfully and is usable.
406-
if err := waitForReadyState(conn, log); err != nil {
407-
return r.updateStatus(ctx, user, workflow.Pending("error waiting for ready state: %s", err.Error()).WithRetry(10), log)
406+
if workflowStatus := checkForReadyState(conn, log); !workflowStatus.IsOK() {
407+
return r.updateStatus(ctx, user, workflowStatus, log)
408408
}
409409

410410
annotationsToAdd, err := getAnnotationsForUserResource(user)
@@ -453,8 +453,8 @@ func (r *MongoDBUserReconciler) handleExternalAuthUser(ctx context.Context, user
453453
// Before we update the MongoDBUser's status to Updated,
454454
// we need to wait for the cluster to be in a ready state
455455
// to ensure that the user has been created successfully and is usable.
456-
if err := waitForReadyState(conn, log); err != nil {
457-
return r.updateStatus(ctx, user, workflow.Pending("error waiting for ready state: %s", err.Error()).WithRetry(10), log)
456+
if err := checkForReadyState(conn, log); err != nil {
457+
return r.updateStatus(ctx, user, err, log)
458458
}
459459

460460
annotationsToAdd, err := getAnnotationsForUserResource(user)
@@ -470,14 +470,14 @@ func (r *MongoDBUserReconciler) handleExternalAuthUser(ctx context.Context, user
470470
return r.updateStatus(ctx, user, workflow.OK(), log)
471471
}
472472

473-
func waitForReadyState(conn om.Connection, log *zap.SugaredLogger) error {
473+
func checkForReadyState(conn om.Connection, log *zap.SugaredLogger) workflow.Status {
474474
automationConfig, err := conn.ReadAutomationConfig()
475475
if err != nil {
476-
return err
476+
return workflow.Failed(err)
477477
}
478478

479479
processes := automationConfig.Deployment.GetAllProcessNames()
480-
return om.WaitForReadyState(conn, processes, false, log)
480+
return om.CheckForReadyState(conn, processes, log)
481481
}
482482

483483
func externalAuthMechanismsAvailable(mechanisms []string) bool {

0 commit comments

Comments
 (0)