From f64388ce3ae2a922e2435d616e4090ce8c4d45f9 Mon Sep 17 00:00:00 2001 From: baoyinghai_yewu Date: Thu, 19 Jun 2025 09:17:01 +0800 Subject: [PATCH 1/2] fix: update vc status to pending while assignWorkNodes error Signed-off-by: baoyinghai_yewu --- .../virtualcluster_init_controller.go | 46 ++++++++----------- 1 file changed, 20 insertions(+), 26 deletions(-) diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 1c4048ad2..c5dc4aeb1 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -120,7 +120,7 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re klog.V(2).InfoS("Virtual Cluster has been deleted", "Virtual Cluster", request) return reconcile.Result{}, nil } - return reconcile.Result{RequeueAfter: RequeueTime}, nil + return reconcile.Result{}, nil } updatedCluster := originalCluster.DeepCopy() updatedCluster.Status.Reason = "" @@ -132,44 +132,35 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re err := c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { + if len(updatedCluster.Spec.Kubeconfig) == 0 { + klog.Warningf("virtualcluster.spec.kubeconfig is nil, but status is deleting. %s %s.", updatedCluster.Name, updatedCluster.Status.Phase) + } else if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { updatedCluster.Status.Phase = v1alpha1.Pending updatedCluster.Status.Reason = err.Error() err := c.Update(updatedCluster) if err != nil { klog.Errorf("Error delete virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error delete virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error delete virtualcluster %s status", updatedCluster.Name) } - return reconcile.Result{}, err + return reconcile.Result{RequeueAfter: RequeueTime}, err } - // if err = c.nodeManager.NodeDelete(ctx, *updatedCluster); err != nil { - // updatedCluster.Status.Phase = v1alpha1.Pending - // updatedCluster.Status.Reason = err.Error() - // err := c.Update(updatedCluster) - // if err != nil { - // klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - // return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) - // } - // return reconcile.Result{}, err - // } - klog.V(2).Infof(" all node is deleted, vc: %s", updatedCluster.Name) updatedCluster.Status.Phase = v1alpha1.AllNodeDeleted err = c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } err = c.destroyVirtualCluster(updatedCluster) if err != nil { klog.Errorf("Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) - return reconcile.Result{}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) } return c.removeFinalizer(updatedCluster) } @@ -192,15 +183,15 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re err := c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s. err: %s", updatedCluster.Name, err.Error()) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - return reconcile.Result{}, errors.Wrap(err, "Error createVirtualCluster") + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrap(err, "Error createVirtualCluster") } updatedCluster.Status.Phase = v1alpha1.Initialized err = c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s. %v", updatedCluster.Name, updatedCluster.Status.Phase, err) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { updatedCluster.Status.Phase = v1alpha1.Pending @@ -208,9 +199,9 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re err := c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - return reconcile.Result{}, err + return reconcile.Result{RequeueAfter: RequeueTime}, err } name, namespace := request.Name, request.Namespace return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster) @@ -229,13 +220,16 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re } err = c.assignWorkNodes(updatedCluster) if err != nil { + updatedCluster.Status.Phase = v1alpha1.Pending + updatedCluster.Status.Reason = err.Error() + err := c.Update(updatedCluster) return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name) } updatedCluster.Status.Phase = v1alpha1.Updating err = c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { updatedCluster.Status.Phase = v1alpha1.Pending @@ -243,9 +237,9 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re err := c.Update(updatedCluster) if err != nil { klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - return reconcile.Result{}, err + return reconcile.Result{RequeueAfter: RequeueTime}, err } name, namespace := request.Name, request.Namespace return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster) From b9c09006a1db84c6ab77d32784b58d675e73f042 Mon Sep 17 00:00:00 2001 From: baoyinghai_yewu Date: Mon, 30 Jun 2025 10:29:18 +0800 Subject: [PATCH 2/2] feat: add etcd export Signed-off-by: baoyinghai_yewu --- pkg/kubenest/constants/constant.go | 7 ++ pkg/kubenest/init.go | 2 + pkg/kubenest/tasks/coredns.go | 8 +- pkg/kubenest/tasks/features.go | 155 +++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 pkg/kubenest/tasks/features.go diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index 917c1fb44..d0e9c6e4c 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -143,6 +143,13 @@ const ( NodeLocalDNSIp = "169.254.20.10" NodeLocalDNSClusterDomain = "cluster.local" NodeLocalDNSService = "__PILLAR__DNS__SERVER__" + + EnabledFeaturesAnnotation = "kosmos.io/enabled-features" + FeatureComponents = "feature-components" +) + +const ( + ExposeEtcd = "expose-etcd" ) type Action string diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 447fdcf15..fdc4bbacd 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -74,6 +74,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { // add server initPhase.AppendTask(tasks.NewComponentsFromManifestsTask()) initPhase.AppendTask(tasks.NewEndPointTask()) + initPhase.AppendTask(tasks.NewFeaturesTask()) initPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts) @@ -104,6 +105,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase { destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask()) destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask()) destroyPhase.AppendTask(tasks.DeleteEtcdPvcTask()) + destroyPhase.AppendTask(tasks.DeleteFeaturesTask()) //destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask()) destroyPhase.SetDataInitializer(func() (workflow.RunData, error) { diff --git a/pkg/kubenest/tasks/coredns.go b/pkg/kubenest/tasks/coredns.go index a56fd992d..445723e7d 100644 --- a/pkg/kubenest/tasks/coredns.go +++ b/pkg/kubenest/tasks/coredns.go @@ -84,7 +84,7 @@ func UninstallCoreDNSTask() workflow.Task { } } -func getCoreDNSHostComponentsConfig(client clientset.Interface, keyName string) ([]ComponentConfig, error) { +func getManifestComponentsConfig(client clientset.Interface, keyName string) ([]ComponentConfig, error) { cm, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.Background(), constants.ManifestComponentsConfigMap, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { @@ -115,7 +115,7 @@ func runCoreDNSHostTask(r workflow.RunData) error { dynamicClient := data.DynamicClient() - components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents) + components, err := getManifestComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents) if err != nil { return err } @@ -149,7 +149,7 @@ func uninstallCorednsHostTask(r workflow.RunData) error { dynamicClient := data.DynamicClient() - components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents) + components, err := getManifestComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents) if err != nil { return err } @@ -219,7 +219,7 @@ func runCoreDNSVirtualTask(r workflow.RunData) error { return err } - components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.VirtualCoreDNSComponents) + components, err := getManifestComponentsConfig(data.RemoteClient(), constants.VirtualCoreDNSComponents) if err != nil { return err } diff --git a/pkg/kubenest/tasks/features.go b/pkg/kubenest/tasks/features.go new file mode 100644 index 000000000..0e5a72bf8 --- /dev/null +++ b/pkg/kubenest/tasks/features.go @@ -0,0 +1,155 @@ +package tasks + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" + "github.com/kosmos.io/kosmos/pkg/kubenest/workflow" +) + +func NewFeaturesTask() workflow.Task { + return workflow.Task{ + Name: "install_features", + Run: runFeatures, + RunSubTasks: true, + } +} + +func DeleteFeaturesTask() workflow.Task { + return workflow.Task{ + Name: "uninstall_features", + Run: cleanFeatures, + RunSubTasks: true, + } +} + +func doExportEtcd(data InitData, components []ComponentConfig) error { + component, err := findComponent(components, constants.ExposeEtcd) + if err != nil { + return err + } + + dynamicClient := data.DynamicClient() + imageRepository, _ := util.GetImageMessage() + + klog.V(2).Infof("Deploy component %s", component.Name) + templatedMapping := map[string]interface{}{ + "Namespace": data.GetNamespace(), + "Name": data.GetName(), + "ImageRepository": imageRepository, + "EtcdListenClientPort": constants.EtcdListenClientPort, + } + for k, v := range data.PluginOptions() { + templatedMapping[k] = v + } + + err = applyYMLTemplate(dynamicClient, component.Path, templatedMapping) + if err != nil { + return err + } + + return nil +} + +func findComponent(components []ComponentConfig, name string) (ComponentConfig, error) { + for _, compcomponent := range components { + if compcomponent.Name == name { + return compcomponent, nil + } + } + + return ComponentConfig{}, fmt.Errorf("component %s not found", name) +} + +func runFeatures(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("features task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[features] Running feature task", "virtual cluster", klog.KObj(data)) + + vc := data.VirtualCluster() + features := strings.Split(vc.Annotations[constants.EnabledFeaturesAnnotation], ",") + + components, err := getManifestComponentsConfig(data.RemoteClient(), constants.FeatureComponents) + if err != nil { + return err + } + + for _, f := range features { + switch strings.TrimSpace(f) { + case constants.ExposeEtcd: + if err := doExportEtcd(data, components); err != nil { + return err + } + default: + klog.V(2).InfoS("[features] Unknown feature", "feature", f) + } + } + + return nil +} + +func cleanExportEtcd(data InitData, components []ComponentConfig) error { + component, err := findComponent(components, constants.ExposeEtcd) + if err != nil { + return err + } + + dynamicClient := data.DynamicClient() + imageRepository, _ := util.GetImageMessage() + + klog.V(2).Infof("Delete component %s", component.Name) + templatedMapping := map[string]interface{}{ + "Namespace": data.GetNamespace(), + "Name": data.GetName(), + "ImageRepository": imageRepository, + "EtcdListenClientPort": constants.EtcdListenClientPort, + } + for k, v := range data.PluginOptions() { + templatedMapping[k] = v + } + + err = deleteYMLTemplate(dynamicClient, component.Path, templatedMapping) + if err != nil { + return err + } + + return nil +} + +func cleanFeatures(r workflow.RunData) error { + data, ok := r.(InitData) + if !ok { + return errors.New("features task invoked with an invalid data struct") + } + + klog.V(4).InfoS("[features] Running feature task", "virtual cluster", klog.KObj(data)) + + vc := data.VirtualCluster() + features := strings.Split(vc.Annotations[constants.EnabledFeaturesAnnotation], ",") + + components, err := getManifestComponentsConfig(data.RemoteClient(), constants.FeatureComponents) + if err != nil { + return err + } + + for _, f := range features { + switch strings.TrimSpace(f) { + case constants.ExposeEtcd: + if err := cleanExportEtcd(data, components); err != nil { + return err + } + default: + klog.V(2).InfoS("[features] Unknown feature", "feature", f) + } + } + + return nil +}