Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/controllers/deployment_cluster_to_standalone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package controllers

import (
"context"

pkgerr "github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/zilliztech/milvus-operator/apis/milvus.io/v1beta1"
)

func (r *MilvusReconciler) CleanupDeploymentClusterToStandalone(ctx context.Context, mc v1beta1.Milvus) error {
logger := ctrl.LoggerFrom(ctx)

if mc.Spec.Mode != v1beta1.MilvusModeStandalone || mc.Spec.Com.EnableManualMode {
return nil
}
// If mode is standalone, we need to ensure all other component deployments are scaled down

// List all deployments with the instance label
deploymentList := &appsv1.DeploymentList{}
opts := &client.ListOptions{
Namespace: mc.Namespace,
LabelSelector: labels.SelectorFromSet(NewAppLabels(mc.Name)),
}

if err := r.List(ctx, deploymentList, opts); err != nil {
return pkgerr.Wrap(err, "list deployments by instance label")
}
var nonStandaloneDeployments []appsv1.Deployment
for i := range deploymentList.Items {
deployment := &deploymentList.Items[i]

// Skip the standalone deployments by checking the component label
if deployment.Labels != nil && deployment.Labels[AppLabelComponent] == MilvusStandalone.Name {
continue
}
nonStandaloneDeployments = append(nonStandaloneDeployments, *deployment)
}
if len(nonStandaloneDeployments) == 0 {
// No non-standalone deployments found
return nil
}
logger.Info("Found non-standalone deployments to delete, checking standalone deployment readiness")

// Check if standalone deployment exists and is ready
// Standalone may use 2 deployment mode, so list all standalone deployments
standaloneDeploymentList := &appsv1.DeploymentList{}
standaloneOpts := &client.ListOptions{
Namespace: mc.Namespace,
LabelSelector: labels.SelectorFromSet(NewComponentAppLabels(
mc.Name,
MilvusStandalone.Name,
)),
}

if err := r.List(ctx, standaloneDeploymentList, standaloneOpts); err != nil {
return pkgerr.Wrap(err, "list standalone deployments")
}

if len(standaloneDeploymentList.Items) == 0 {
// If standalone deployment doesn't exist yet, we can't proceed
logger.V(1).Info("Standalone deployment not found, skip cluster to standalone cleanup")
return nil
}

// Check if all standalone deployments are ready
allStandaloneReady := true
for i := range standaloneDeploymentList.Items {
if !DeploymentReady(standaloneDeploymentList.Items[i].Status) {
allStandaloneReady = false
break
}
}

if !allStandaloneReady {
logger.V(1).Info("Standalone deployment not ready yet, skip cluster to standalone cleanup")
return nil
}

logger.Info("Standalone deployment is ready, delete cluster component deployments")
for i := range nonStandaloneDeployments {
deployment := &nonStandaloneDeployments[i]
err := r.Delete(ctx, deployment)
if client.IgnoreNotFound(err) != nil {
return pkgerr.Wrapf(err, "delete deployment %s/%s", deployment.Namespace, deployment.Name)
}
logger.Info("Deleted deployment", "deployment", deployment.Name)
}

return nil
}
214 changes: 214 additions & 0 deletions pkg/controllers/deployment_cluster_to_standalone_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package controllers

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/zilliztech/milvus-operator/apis/milvus.io/v1beta1"
)

// Helper functions for test setup
func newDeployment(name, component string, ready bool, replicas *int32) appsv1.Deployment {
deploy := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "ns",
Labels: map[string]string{
AppLabelInstance: "mc",
AppLabelComponent: component,
},
},
}
if replicas != nil {
deploy.Spec.Replicas = replicas
}
if ready {
deploy.Status = appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
},
},
}
}
return deploy
}

func mockListDeployments(deployments []appsv1.Deployment) func(interface{}, interface{}, ...interface{}) error {
return func(ctx interface{}, list interface{}, opts ...interface{}) error {
deployList := list.(*appsv1.DeploymentList)
deployList.Items = deployments
return nil
}
}

func TestMilvusReconciler_ReconcileDeploymentClusterToStandalone(t *testing.T) {
env := newTestEnv(t)
defer env.checkMocks()
r := env.Reconciler
mockClient := env.MockClient
ctx := env.ctx

t.Run("cluster mode - should skip", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeCluster

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("manual mode - should skip", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone
mc.Spec.Com.EnableManualMode = true

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("standalone mode - no non-standalone deployments", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, false, nil),
}))

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("standalone mode - has non-standalone deployments but standalone not ready", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, false, nil),
newDeployment("mc-milvus-proxy", Proxy.Name, false, int32Ptr(1)),
}))

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, false, nil),
}))

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("standalone mode - has non-standalone deployments and standalone is ready - should delete", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, false, nil),
newDeployment("mc-milvus-proxy", Proxy.Name, false, int32Ptr(1)),
newDeployment("mc-milvus-datanode", DataNode.Name, false, int32Ptr(2)),
}))

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, true, nil),
}))

deletedDeployments := make(map[string]bool)
mockClient.EXPECT().Delete(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx interface{}, obj interface{}, opts ...interface{}) error {
deploy := obj.(*appsv1.Deployment)
deletedDeployments[deploy.Name] = true
return nil
}).Times(2)

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
assert.True(t, deletedDeployments["mc-milvus-proxy"])
assert.True(t, deletedDeployments["mc-milvus-datanode"])
})

t.Run("standalone mode - standalone deployment not found yet", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-proxy", Proxy.Name, false, nil),
}))

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{}))

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("standalone mode - 2 deployment mode with both ready", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone-0", MilvusStandalone.Name, false, nil),
newDeployment("mc-milvus-standalone-1", MilvusStandalone.Name, false, nil),
newDeployment("mc-milvus-proxy", Proxy.Name, false, int32Ptr(1)),
}))

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone-0", MilvusStandalone.Name, true, nil),
newDeployment("mc-milvus-standalone-1", MilvusStandalone.Name, true, nil),
}))

mockClient.EXPECT().Delete(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx interface{}, obj interface{}, opts ...interface{}) error {
deploy := obj.(*appsv1.Deployment)
assert.Equal(t, "mc-milvus-proxy", deploy.Name)
return nil
})

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
})

t.Run("standalone mode - multiple deployments of same component should all be deleted", func(t *testing.T) {
mc := env.Inst.DeepCopy()
mc.Spec.Mode = v1beta1.MilvusModeStandalone

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, false, nil),
newDeployment("mc-milvus-querynode-0", QueryNode.Name, false, int32Ptr(1)),
newDeployment("mc-milvus-querynode-1", QueryNode.Name, false, int32Ptr(1)),
newDeployment("mc-milvus-datanode-0", DataNode.Name, false, int32Ptr(2)),
newDeployment("mc-milvus-datanode-1", DataNode.Name, false, int32Ptr(2)),
}))

mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).
DoAndReturn(mockListDeployments([]appsv1.Deployment{
newDeployment("mc-milvus-standalone", MilvusStandalone.Name, true, nil),
}))

deletedDeployments := make(map[string]bool)
mockClient.EXPECT().Delete(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx interface{}, obj interface{}, opts ...interface{}) error {
deploy := obj.(*appsv1.Deployment)
deletedDeployments[deploy.Name] = true
return nil
}).Times(4)

err := r.CleanupDeploymentClusterToStandalone(ctx, *mc)
assert.NoError(t, err)
assert.True(t, deletedDeployments["mc-milvus-querynode-0"])
assert.True(t, deletedDeployments["mc-milvus-querynode-1"])
assert.True(t, deletedDeployments["mc-milvus-datanode-0"])
assert.True(t, deletedDeployments["mc-milvus-datanode-1"])
})
}
5 changes: 5 additions & 0 deletions pkg/controllers/deployments.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ func (r *MilvusReconciler) ReconcileDeployments(ctx context.Context, mc v1beta1.
return fmt.Errorf("reconcile milvus deployments errs: %w", errors.Join(errs...))
}

err = r.CleanupDeploymentClusterToStandalone(ctx, mc)
if err != nil {
return err
}

err = r.cleanupIndexNodeIfNeeded(ctx, mc)
if err != nil {
return err
Expand Down
Loading