Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/kubenest/constants/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ const (
NodeLocalDNSIp = "169.254.20.10"
NodeLocalDNSClusterDomain = "cluster.local"
NodeLocalDNSService = "__PILLAR__DNS__SERVER__"

EnabledFeaturesAnnotation = "kosmos.io/enabled-features"
FeatureComponents = "feature-components"
)

const (
ExposeEtcd = "expose-etcd"
)

type Action string
Expand Down
46 changes: 20 additions & 26 deletions pkg/kubenest/controller/virtualcluster_init_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re
klog.V(2).InfoS("Virtual Cluster has been deleted", "Virtual Cluster", request)
return reconcile.Result{}, nil
}
return reconcile.Result{RequeueAfter: RequeueTime}, nil
return reconcile.Result{}, nil
}
updatedCluster := originalCluster.DeepCopy()
updatedCluster.Status.Reason = ""
Expand All @@ -132,44 +132,35 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re
err := c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}

if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil {
if len(updatedCluster.Spec.Kubeconfig) == 0 {
klog.Warningf("virtualcluster.spec.kubeconfig is nil, but status is deleting. %s %s.", updatedCluster.Name, updatedCluster.Status.Phase)
} else if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil {
updatedCluster.Status.Phase = v1alpha1.Pending
updatedCluster.Status.Reason = err.Error()
err := c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error delete virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error delete virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error delete virtualcluster %s status", updatedCluster.Name)
}
return reconcile.Result{}, err
return reconcile.Result{RequeueAfter: RequeueTime}, err
}

// if err = c.nodeManager.NodeDelete(ctx, *updatedCluster); err != nil {
// updatedCluster.Status.Phase = v1alpha1.Pending
// updatedCluster.Status.Reason = err.Error()
// err := c.Update(updatedCluster)
// if err != nil {
// klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
// return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
// }
// return reconcile.Result{}, err
// }

klog.V(2).Infof(" all node is deleted, vc: %s", updatedCluster.Name)

updatedCluster.Status.Phase = v1alpha1.AllNodeDeleted
err = c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}

err = c.destroyVirtualCluster(updatedCluster)
if err != nil {
klog.Errorf("Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error())
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error())
}
return c.removeFinalizer(updatedCluster)
}
Expand All @@ -192,25 +183,25 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re
err := c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s. err: %s", updatedCluster.Name, err.Error())
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
return reconcile.Result{}, errors.Wrap(err, "Error createVirtualCluster")
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrap(err, "Error createVirtualCluster")
}
updatedCluster.Status.Phase = v1alpha1.Initialized
err = c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s. %v", updatedCluster.Name, updatedCluster.Status.Phase, err)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil {
updatedCluster.Status.Phase = v1alpha1.Pending
updatedCluster.Status.Reason = err.Error()
err := c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
return reconcile.Result{}, err
return reconcile.Result{RequeueAfter: RequeueTime}, err
}
name, namespace := request.Name, request.Namespace
return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster)
Expand All @@ -229,23 +220,26 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re
}
err = c.assignWorkNodes(updatedCluster)
if err != nil {
updatedCluster.Status.Phase = v1alpha1.Pending
updatedCluster.Status.Reason = err.Error()
err := c.Update(updatedCluster)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s", updatedCluster.Name)
}
updatedCluster.Status.Phase = v1alpha1.Updating
err = c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil {
updatedCluster.Status.Phase = v1alpha1.Pending
updatedCluster.Status.Reason = err.Error()
err := c.Update(updatedCluster)
if err != nil {
klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase)
return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
return reconcile.Result{RequeueAfter: RequeueTime}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name)
}
return reconcile.Result{}, err
return reconcile.Result{RequeueAfter: RequeueTime}, err
}
name, namespace := request.Name, request.Namespace
return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubenest/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase {
// add server
initPhase.AppendTask(tasks.NewComponentsFromManifestsTask())
initPhase.AppendTask(tasks.NewEndPointTask())
initPhase.AppendTask(tasks.NewFeaturesTask())

initPhase.SetDataInitializer(func() (workflow.RunData, error) {
return newRunData(opts)
Expand Down Expand Up @@ -104,6 +105,7 @@ func UninstallPhase(opts *InitOptions) *workflow.Phase {
destroyPhase.AppendTask(tasks.UninstallVirtualClusterServiceTask())
destroyPhase.AppendTask(tasks.UninstallCertsAndKubeconfigTask())
destroyPhase.AppendTask(tasks.DeleteEtcdPvcTask())
destroyPhase.AppendTask(tasks.DeleteFeaturesTask())
//destroyPhase.AppendTask(tasks.UninstallVirtualClusterProxyTask())

destroyPhase.SetDataInitializer(func() (workflow.RunData, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubenest/tasks/coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func UninstallCoreDNSTask() workflow.Task {
}
}

func getCoreDNSHostComponentsConfig(client clientset.Interface, keyName string) ([]ComponentConfig, error) {
func getManifestComponentsConfig(client clientset.Interface, keyName string) ([]ComponentConfig, error) {
cm, err := client.CoreV1().ConfigMaps(constants.KosmosNs).Get(context.Background(), constants.ManifestComponentsConfigMap, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -115,7 +115,7 @@ func runCoreDNSHostTask(r workflow.RunData) error {

dynamicClient := data.DynamicClient()

components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents)
components, err := getManifestComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents)
if err != nil {
return err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func uninstallCorednsHostTask(r workflow.RunData) error {

dynamicClient := data.DynamicClient()

components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents)
components, err := getManifestComponentsConfig(data.RemoteClient(), constants.HostCoreDnsComponents)
if err != nil {
return err
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func runCoreDNSVirtualTask(r workflow.RunData) error {
return err
}

components, err := getCoreDNSHostComponentsConfig(data.RemoteClient(), constants.VirtualCoreDNSComponents)
components, err := getManifestComponentsConfig(data.RemoteClient(), constants.VirtualCoreDNSComponents)
if err != nil {
return err
}
Expand Down
155 changes: 155 additions & 0 deletions pkg/kubenest/tasks/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package tasks

import (
"fmt"
"strings"

"github.com/pkg/errors"
"k8s.io/klog/v2"

"github.com/kosmos.io/kosmos/pkg/kubenest/constants"
"github.com/kosmos.io/kosmos/pkg/kubenest/util"
"github.com/kosmos.io/kosmos/pkg/kubenest/workflow"
)

func NewFeaturesTask() workflow.Task {
return workflow.Task{
Name: "install_features",
Run: runFeatures,
RunSubTasks: true,
}
}

func DeleteFeaturesTask() workflow.Task {
return workflow.Task{
Name: "uninstall_features",
Run: cleanFeatures,
RunSubTasks: true,
}
}

func doExportEtcd(data InitData, components []ComponentConfig) error {
component, err := findComponent(components, constants.ExposeEtcd)
if err != nil {
return err
}

dynamicClient := data.DynamicClient()
imageRepository, _ := util.GetImageMessage()

klog.V(2).Infof("Deploy component %s", component.Name)
templatedMapping := map[string]interface{}{
"Namespace": data.GetNamespace(),
"Name": data.GetName(),
"ImageRepository": imageRepository,
"EtcdListenClientPort": constants.EtcdListenClientPort,
}
for k, v := range data.PluginOptions() {
templatedMapping[k] = v
}

err = applyYMLTemplate(dynamicClient, component.Path, templatedMapping)
if err != nil {
return err
}

return nil
}

func findComponent(components []ComponentConfig, name string) (ComponentConfig, error) {
for _, compcomponent := range components {
if compcomponent.Name == name {
return compcomponent, nil
}
}

return ComponentConfig{}, fmt.Errorf("component %s not found", name)
}

func runFeatures(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("features task invoked with an invalid data struct")
}

klog.V(4).InfoS("[features] Running feature task", "virtual cluster", klog.KObj(data))

vc := data.VirtualCluster()
features := strings.Split(vc.Annotations[constants.EnabledFeaturesAnnotation], ",")

components, err := getManifestComponentsConfig(data.RemoteClient(), constants.FeatureComponents)
if err != nil {
return err
}

for _, f := range features {
switch strings.TrimSpace(f) {
case constants.ExposeEtcd:
if err := doExportEtcd(data, components); err != nil {
return err
}
default:
klog.V(2).InfoS("[features] Unknown feature", "feature", f)
}
}

return nil
}

func cleanExportEtcd(data InitData, components []ComponentConfig) error {
component, err := findComponent(components, constants.ExposeEtcd)
if err != nil {
return err
}

dynamicClient := data.DynamicClient()
imageRepository, _ := util.GetImageMessage()

klog.V(2).Infof("Delete component %s", component.Name)
templatedMapping := map[string]interface{}{
"Namespace": data.GetNamespace(),
"Name": data.GetName(),
"ImageRepository": imageRepository,
"EtcdListenClientPort": constants.EtcdListenClientPort,
}
for k, v := range data.PluginOptions() {
templatedMapping[k] = v
}

err = deleteYMLTemplate(dynamicClient, component.Path, templatedMapping)
if err != nil {
return err
}

return nil
}

func cleanFeatures(r workflow.RunData) error {
data, ok := r.(InitData)
if !ok {
return errors.New("features task invoked with an invalid data struct")
}

klog.V(4).InfoS("[features] Running feature task", "virtual cluster", klog.KObj(data))

vc := data.VirtualCluster()
features := strings.Split(vc.Annotations[constants.EnabledFeaturesAnnotation], ",")

components, err := getManifestComponentsConfig(data.RemoteClient(), constants.FeatureComponents)
if err != nil {
return err
}

for _, f := range features {
switch strings.TrimSpace(f) {
case constants.ExposeEtcd:
if err := cleanExportEtcd(data, components); err != nil {
return err
}
default:
klog.V(2).InfoS("[features] Unknown feature", "feature", f)
}
}

return nil
}
Loading