Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions pkg/controllers/deploy_ctrl_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (c *DeployControllerBizUtilImpl) ScaleDeployments(ctx context.Context, mc v
if err != nil {
return err
}
action := c.planNextScaleAction(mc, currentDeployment, lastDeployment)
action := c.planNextScaleAction(ctx, mc, currentDeployment, lastDeployment)
if action != noScaleAction {
ctrl.LoggerFrom(ctx).Info("do scale action", "deployName", action.deploy.Name, "replicaChange", action.replicaChange, "isCurrentDeploy", action.deploy == lastDeployment)
}
Expand Down Expand Up @@ -328,13 +328,13 @@ type scaleAction struct {

var noScaleAction = scaleAction{}

func (c *DeployControllerBizUtilImpl) planNextScaleAction(mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) scaleAction {
func (c *DeployControllerBizUtilImpl) planNextScaleAction(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) scaleAction {
scaleKind := c.checkScaleKind(mc, lastDeployment)
currentDeployment.Spec.Strategy = GetDeploymentStrategy(&mc, c.component)
lastDeployment.Spec.Strategy = GetDeploymentStrategy(&mc, c.component)
switch scaleKind {
case scaleKindHPA:
return c.planScaleForHPA(currentDeployment)
return c.planScaleForHPA(ctx, mc, currentDeployment, lastDeployment)
case scaleKindRollout:
return c.planScaleForRollout(mc, currentDeployment, lastDeployment)
case scaleKindForce:
Expand Down Expand Up @@ -368,12 +368,35 @@ func (c *DeployControllerBizUtilImpl) checkScaleKind(mc v1beta1.Milvus, lastDepl
return scaleKindNormal
}

// planScaleForHPA assumes epectedReplicas < 0
func (c *DeployControllerBizUtilImpl) planScaleForHPA(currentDeployment *appsv1.Deployment) scaleAction {
func (c *DeployControllerBizUtilImpl) planScaleForHPA(ctx context.Context, mc v1beta1.Milvus, currentDeployment, lastDeployment *appsv1.Deployment) scaleAction {
currentDeployReplicas := getDeployReplicas(currentDeployment)
lastDeployReplicas := getDeployReplicas(lastDeployment)

// Bootstrap current deployment from 0 (HPA can't scale from 0)
if currentDeployReplicas == 0 {
return scaleAction{deploy: currentDeployment, replicaChange: 1}
}

isRolling := v1beta1.Labels().IsComponentRolling(mc, c.component.Name)

// During rolling update, scale down old deployment once new one is ready
if isRolling && lastDeployReplicas > 0 {
// Wait until current deployment has at least as many ready replicas as old deploymen
if currentDeployment.Status.ReadyReplicas >= int32(lastDeployReplicas) {
ctrl.LoggerFrom(ctx).Info("scaling down old deployment during HPA rolling update",
"oldDeployment", lastDeployment.Name,
"oldReplicas", lastDeployReplicas,
"currentDeployment", currentDeployment.Name,
"currentReadyReplicas", currentDeployment.Status.ReadyReplicas)
return scaleAction{deploy: lastDeployment, replicaChange: -lastDeployReplicas}
}

ctrl.LoggerFrom(ctx).V(1).Info("waiting for current deployment to have enough ready replicas",
"currentDeployment", currentDeployment.Name,
"currentReadyReplicas", currentDeployment.Status.ReadyReplicas,
"requiredReplicas", lastDeployReplicas)
}

return noScaleAction
}

Expand Down
50 changes: 47 additions & 3 deletions pkg/controllers/deploy_ctrl_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,49 @@ func TestDeployControllerBizUtilImpl_ScaleDeployements(t *testing.T) {
assert.Equal(t, int32(1), *currentDeploy.Spec.Replicas)
})

t.Run("hpa rolling update, scale down old deployment when current ready", func(t *testing.T) {
mockCtrl.Finish()
mc := *milvus.DeepCopy()
mc.Spec.Com.DataNode.Replicas = int32Ptr(-1)
currentDeploy := deployTemplate.DeepCopy()
v1beta1.Labels().SetGroupIDStr(DataNodeName, currentDeploy.Labels, "1")
lastDeploy := deployTemplate.DeepCopy()
lastDeploy.Spec.Replicas = int32Ptr(3)
currentDeploy.Spec.Replicas = int32Ptr(5)
currentDeploy.Status.ReadyReplicas = 3 // Equal to old deployment replicas
mockutil.EXPECT().MarkMilvusComponentGroupId(ctx, mc, DataNode, 1).Return(nil)
mockutil.EXPECT().ListDeployPods(ctx, lastDeploy, DataNode).Return(pods, nil)
mockutil.EXPECT().DeploymentIsStable(lastDeploy, pods).Return(true, "")
mockutil.EXPECT().ListDeployPods(ctx, currentDeploy, DataNode).Return(currentPods, nil)
mockutil.EXPECT().DeploymentIsStable(currentDeploy, currentPods).Return(true, "")
mockutil.EXPECT().UpdateAndRequeue(ctx, gomock.Any()).Return(ErrRequeue)
err := bizUtil.ScaleDeployments(ctx, mc, currentDeploy, lastDeploy)
assert.True(t, errors.Is(err, ErrRequeue))
// Old deployment should be scaled down to 0 all at once
assert.Equal(t, int32(0), *lastDeploy.Spec.Replicas)
})

t.Run("hpa rolling update, wait for current ready before scale down", func(t *testing.T) {
mockCtrl.Finish()
mc := *milvus.DeepCopy()
mc.Spec.Com.DataNode.Replicas = int32Ptr(-1)
currentDeploy := deployTemplate.DeepCopy()
v1beta1.Labels().SetGroupIDStr(DataNodeName, currentDeploy.Labels, "1")
lastDeploy := deployTemplate.DeepCopy()
lastDeploy.Spec.Replicas = int32Ptr(3)
currentDeploy.Spec.Replicas = int32Ptr(2)
currentDeploy.Status.ReadyReplicas = 2 // Less than old deployment replicas
mockutil.EXPECT().MarkMilvusComponentGroupId(ctx, mc, DataNode, 1).Return(nil)
mockutil.EXPECT().ListDeployPods(ctx, lastDeploy, DataNode).Return(pods, nil)
mockutil.EXPECT().DeploymentIsStable(lastDeploy, pods).Return(true, "")
mockutil.EXPECT().ListDeployPods(ctx, currentDeploy, DataNode).Return(currentPods, nil)
mockutil.EXPECT().DeploymentIsStable(currentDeploy, currentPods).Return(true, "")
err := bizUtil.ScaleDeployments(ctx, mc, currentDeploy, lastDeploy)
assert.NoError(t, err)
// Old deployment should NOT be scaled down yet
assert.Equal(t, int32(3), *lastDeploy.Spec.Replicas)
})

t.Run("rollout finished", func(t *testing.T) {
mockCtrl.Finish()
mc := *milvus.DeepCopy()
Expand Down Expand Up @@ -1378,6 +1421,7 @@ func TestDeployControllerBizUtilImpl_planNextScaleAction(t *testing.T) {
}
mc := v1beta1.Milvus{}
mc.Spec.Com.Standalone = &v1beta1.MilvusStandalone{}
ctx := context.Background()

currentDeploy := new(appsv1.Deployment)
lastDeploy := new(appsv1.Deployment)
Expand All @@ -1388,7 +1432,7 @@ func TestDeployControllerBizUtilImpl_planNextScaleAction(t *testing.T) {
mc.Spec.Com.ImageUpdateMode = v1beta1.ImageUpdateModeForce
*expected = int32Ptr(3)
*currentReplicas = int32Ptr(0)
action := bizUtil.planNextScaleAction(mc, currentDeploy, lastDeploy)
action := bizUtil.planNextScaleAction(ctx, mc, currentDeploy, lastDeploy)
assert.Equal(t, scaleAction{
deploy: currentDeploy,
replicaChange: 3,
Expand All @@ -1400,7 +1444,7 @@ func TestDeployControllerBizUtilImpl_planNextScaleAction(t *testing.T) {
*expected = int32Ptr(3)
*currentReplicas = int32Ptr(3)
*lastReplicas = int32Ptr(3)
action := bizUtil.planNextScaleAction(mc, currentDeploy, lastDeploy)
action := bizUtil.planNextScaleAction(ctx, mc, currentDeploy, lastDeploy)
assert.Equal(t, scaleAction{
deploy: lastDeploy,
replicaChange: -3,
Expand All @@ -1412,7 +1456,7 @@ func TestDeployControllerBizUtilImpl_planNextScaleAction(t *testing.T) {
*expected = int32Ptr(3)
*currentReplicas = int32Ptr(3)
*lastReplicas = int32Ptr(0)
action := bizUtil.planNextScaleAction(mc, currentDeploy, lastDeploy)
action := bizUtil.planNextScaleAction(ctx, mc, currentDeploy, lastDeploy)
assert.Equal(t, noScaleAction, action)
})
}
Expand Down