diff --git a/cmd/kubenest/operator/app/operator.go b/cmd/kubenest/operator/app/operator.go index 22efe6845..124f1c5f1 100644 --- a/cmd/kubenest/operator/app/operator.go +++ b/cmd/kubenest/operator/app/operator.go @@ -27,7 +27,6 @@ import ( endpointscontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/endpoints.sync.controller" glnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/global.node.controller" kosmos "github.com/kosmos.io/kosmos/pkg/kubenest/controller/kosmos" - vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller" "github.com/kosmos.io/kosmos/pkg/scheme" "github.com/kosmos.io/kosmos/pkg/sharedcli/klogflag" ) @@ -263,15 +262,15 @@ func run(ctx context.Context, config *config.Config) error { return fmt.Errorf("could not create clientset: %v", err) } - VirtualClusterInitController := controller.VirtualClusterInitController{ - Client: mgr.GetClient(), - Config: mgr.GetConfig(), - EventRecorder: mgr.GetEventRecorderFor(constants.InitControllerName), - RootClientSet: hostKubeClient, - KosmosClient: kosmosClient, - KubeNestOptions: &config.KubeNestOptions, - CoreNamespaces: config.CoreNamespaces, - } + VirtualClusterInitController := controller.NewInitController( + mgr.GetClient(), + mgr.GetConfig(), + mgr.GetEventRecorderFor(constants.InitControllerName), + hostKubeClient, + kosmosClient, + &config.KubeNestOptions, + config.CoreNamespaces, + ) if err = VirtualClusterInitController.SetupWithManager(mgr); err != nil { return fmt.Errorf("error starting %s: %v", constants.InitControllerName, err) } @@ -299,17 +298,17 @@ func run(ctx context.Context, config *config.Config) error { return err } - VirtualClusterNodeController := vcnodecontroller.NewNodeController( - mgr.GetClient(), - hostKubeClient, - mgr.GetEventRecorderFor(constants.NodeControllerName), - kosmosClient, - &config.KubeNestOptions, - ) + // VirtualClusterNodeController := vcnodecontroller.NewNodeController( + // mgr.GetClient(), + // hostKubeClient, + // mgr.GetEventRecorderFor(constants.NodeControllerName), + // kosmosClient, + // &config.KubeNestOptions, + // ) - if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil { - return fmt.Errorf("error starting %s: %v", constants.NodeControllerName, err) - } + // if err = VirtualClusterNodeController.SetupWithManager(mgr); err != nil { + // return fmt.Errorf("error starting %s: %v", constants.NodeControllerName, err) + // } if config.KubeNestOptions.KubeNestType == v1alpha1.KosmosKube { KosmosJoinController := kosmos.KosmosJoinController{ diff --git a/pkg/kosmosctl/cert/README.md b/pkg/kosmosctl/cert/README.md new file mode 100644 index 000000000..24391ced5 --- /dev/null +++ b/pkg/kosmosctl/cert/README.md @@ -0,0 +1,5 @@ +# update cert for virtual cluster + +``` +./kosmosctl renew cert --kubeconfig=config --namespace=test0318 --name=example0318 --agent-user=XXXX --agent-pass=XXXX +``` \ No newline at end of file diff --git a/pkg/kosmosctl/cert/constant.go b/pkg/kosmosctl/cert/constant.go new file mode 100644 index 000000000..c294f11c1 --- /dev/null +++ b/pkg/kosmosctl/cert/constant.go @@ -0,0 +1,41 @@ +package cert + +const certShell = ` +#!/usr/bin/env bash + +source "env.sh" + +CERT_PATH=/apps/conf/kosmos/cert + +function update() { + echo "exec(1/): copy ca.crt...." + cp "$CERT_PATH/ca.crt" "$PATH_KUBERNETES_PKI/ca.crt" + if [ $? -ne 0 ]; then + exit 1 + fi + echo "exec(2/): copy kubeconfig...." + cp "$CERT_PATH/kubelet.conf" "$PATH_KUBERNETES/$KUBELET_KUBE_CONFIG_NAME" + if [ $? -ne 0 ]; then + exit 1 + fi + + KUBELET_PKI_PATH="${PATH_KUBELET_LIB}/pki/*" + echo "exec(3/): remove pki form kubelet.... ${KUBELET_PKI_PATH}" + rm -rf $KUBELET_PKI_PATH + + systemctl restart kubelet + systemctl status kubelet + + +} + +# See how we were called. +case "$1" in + update) + update + ;; + *) + echo $"usage: $0 update" + exit 1 +esac +` diff --git a/pkg/kosmosctl/cert/option.go b/pkg/kosmosctl/cert/option.go new file mode 100644 index 000000000..5a734c4f8 --- /dev/null +++ b/pkg/kosmosctl/cert/option.go @@ -0,0 +1,136 @@ +package cert + +import ( + "context" + "fmt" + "reflect" + "runtime" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/scheme" +) + +type Option struct { + client.Client + remoteClient clientset.Interface + kosmosClient versioned.Interface + virtualCluster *v1alpha1.VirtualCluster + dynamicClient *dynamic.DynamicClient + restConfig *rest.Config +} + +func NewCertOption(o *RenewOptions) (*Option, error) { + config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) + if err != nil { + klog.Infof("Failed to build config: %v\n", err) + return nil, err + } + + cli, err := client.New(config, client.Options{Scheme: scheme.NewSchema()}) + if err != nil { + klog.Infof("Failed to create client: %v\n", err) + return nil, err + } + + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + klog.Infof("Failed to create dynamic client: %v\n", err) + return nil, err + } + + kosmosClient, err := versioned.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("error when creating kosmosClient client, err: %w", err) + } + + localClusterClient, err := clientset.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("error when creating local cluster client, err: %w", err) + } + + var remoteClient clientset.Interface = localClusterClient + + gvr := schema.GroupVersionResource{ + Group: "kosmos.io", + Version: "v1alpha1", + Resource: "virtualclusters", + } + + unstructuredObj, err := dynamicClient.Resource(gvr).Namespace(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{}) + if err != nil { + fmt.Printf("Failed to get CRD resources: %v\n", err) + return nil, err + } + + var virtualCluster v1alpha1.VirtualCluster + err = k8sruntime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &virtualCluster) + if err != nil { + klog.Infof("Error converting to structured object: %v\n", err) + return nil, err + } + + return &Option{ + Client: cli, + remoteClient: remoteClient, + kosmosClient: kosmosClient, + virtualCluster: &virtualCluster, + dynamicClient: dynamicClient, + restConfig: config, + }, nil +} + +func (c *Option) GetName() string { + return c.virtualCluster.GetName() +} + +func (c *Option) GetNamespace() string { + return c.virtualCluster.GetNamespace() +} + +func (c *Option) VirtualCluster() *v1alpha1.VirtualCluster { + return c.virtualCluster +} + +func (c *Option) DynamicClient() *dynamic.DynamicClient { + return c.dynamicClient +} + +func (c *Option) RemoteClient() clientset.Interface { + return c.remoteClient +} + +func (c *Option) KosmosClient() versioned.Interface { + return c.kosmosClient +} + +func (c *Option) UpdateVirtualCluster(vc *v1alpha1.VirtualCluster) { + c.virtualCluster = vc +} + +type TaskFunc func(*Option) error + +func getFunctionName(i interface{}) string { + return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name() +} +func RunTask(tasks []TaskFunc, r *Option) error { + total := len(tasks) + for index, task := range tasks { + klog.Infof("###################### Running task (%d/%d): [%s] \n", index+1, total, getFunctionName(task)) + err := task(r) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/kosmosctl/cert/renew.go b/pkg/kosmosctl/cert/renew.go index 0d57e0753..91a225e3e 100644 --- a/pkg/kosmosctl/cert/renew.go +++ b/pkg/kosmosctl/cert/renew.go @@ -1,35 +1,31 @@ package cert import ( - "context" "fmt" + "os" "github.com/spf13/cobra" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/klog" + "k8s.io/klog/v2" ctlutil "k8s.io/kubectl/pkg/cmd/util" "k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/templates" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller" - "github.com/kosmos.io/kosmos/pkg/scheme" ) var RenewCertExample = templates.Examples(i18n.T(` # Renew cert, e.g: - kosmosctl renew cert --kubeconfig=xxxx --namespace=xxxx --name=xxxx + kosmosctl renew cert --kubeconfig=xxxx --namespace=xxxx --name=xxxx --agent-user=xxxx --agent-pass=xxxx `)) type RenewOptions struct { Namespace string Name string KubeconfigPath string + NodeAgentOptions +} + +type NodeAgentOptions struct { + WebUser string + WebPass string } func NewCmdRenewCert() *cobra.Command { @@ -53,7 +49,8 @@ func NewCmdRenewCert() *cobra.Command { flags.StringVarP(&o.Namespace, "namespace", "e", "", "namespace of vc") flags.StringVarP(&o.Name, "name", "n", "", "name of vc") flags.StringVarP(&o.KubeconfigPath, "kubeconfig", "k", "", "kubeconfig path of host cluster") - + flags.StringVarP(&o.WebUser, "agent-user", "u", "", "user of node agent") + flags.StringVarP(&o.WebPass, "agent-pass", "p", "", "password of node agent") return cmd } @@ -62,60 +59,53 @@ func (o *RenewOptions) Complete() (err error) { } func (o *RenewOptions) Validate() error { - return nil -} - -func (o *RenewOptions) Run() error { - klog.V(4).Info("kosmos-io.tar.gz has been saved successfully. ") - - config, err := clientcmd.BuildConfigFromFlags("", o.KubeconfigPath) - if err != nil { - klog.Infof("Failed to build config: %v\n", err) - return err + if len(o.WebPass) == 0 { + return fmt.Errorf("web pass is required") } - cli, err := client.New(config, client.Options{Scheme: scheme.NewSchema()}) - if err != nil { - klog.Infof("Failed to create client: %v\n", err) - return err + if len(o.WebUser) == 0 { + return fmt.Errorf("use pass is required") } - - dynamicClient, err := dynamic.NewForConfig(config) - if err != nil { - klog.Infof("Failed to create dynamic client: %v\n", err) - return err + if len(o.KubeconfigPath) == 0 { + return fmt.Errorf("kubeconfig path is required") } - - // 设置 CRD 的 Group、Version、Resource - gvr := schema.GroupVersionResource{ - Group: "kosmos.io", // CRD 的 API 组 - Version: "v1alpha1", // CRD 的版本 - Resource: "virtualclusters", // 资源的复数名称 + if len(o.Namespace) == 0 { + return fmt.Errorf("namespace is required") } - - // 获取 CRD 资源 - unstructuredObj, err := dynamicClient.Resource(gvr).Namespace(o.Namespace).Get(context.TODO(), o.Name, metav1.GetOptions{}) - if err != nil { - fmt.Printf("Failed to get CRD resources: %v\n", err) - return err + if len(o.Name) == 0 { + return fmt.Errorf("name is required") } + return nil +} - var virtualCluster v1alpha1.VirtualCluster - err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructuredObj.Object, &virtualCluster) - if err != nil { - klog.Infof("Error converting to structured object: %v\n", err) - return err - } +func (o *RenewOptions) initEnv() { + os.Setenv("KUBECONFIG", o.KubeconfigPath) + os.Setenv("WEB_USER", o.WebUser) + os.Setenv("WEB_PASS", o.WebPass) +} - exec, err := controller.UpdateCertPhase(&virtualCluster, cli, config, &v1alpha1.KubeNestConfiguration{}) +func (o *RenewOptions) Run() error { + r, err := NewCertOption(o) + o.initEnv() if err != nil { - panic(err) + return err } + return Do(r) +} - err = exec.Execute() +func Do(r *Option) error { + err := RunTask([]TaskFunc{ + RunCheckEnvironment, + RunBackupSecrets, + RunReCreateCertAndKubeConfig, + UpdateKubeProxyConfig, + RestartVirtualControlPlanePod, + RestartVirtualWorkerKubelet, + RestartVirtualPod, + }, r) if err != nil { - panic(err) + return err } - + klog.Infof("############ renew cert success!!!!") return nil } diff --git a/pkg/kosmosctl/cert/taskfunc.go b/pkg/kosmosctl/cert/taskfunc.go new file mode 100644 index 000000000..3b1bd41ac --- /dev/null +++ b/pkg/kosmosctl/cert/taskfunc.go @@ -0,0 +1,326 @@ +package cert + +import ( + "context" + "encoding/base64" + "fmt" + "os" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/exector" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func RunBackupSecrets(data *Option) error { + klog.Infof("backup secrets for virtual cluster") + // create dir + vc := data.VirtualCluster() + dirName := BackDir(vc) + + err := os.Mkdir(dirName, 0755) + if err != nil { + if !os.IsExist(err) { + return fmt.Errorf("create backup dir failed: %s", err.Error()) + } + } + klog.InfoS("create backup dir successed:", dirName) + + // backup certs + secrets := []string{ + util.GetEtcdCertName(vc.GetName()), + util.GetCertName(vc.GetName()), + util.GetAdminConfigSecretName(vc.GetName()), + util.GetAdminConfigClusterIPSecretName(vc.GetName()), + } + + for _, secret := range secrets { + klog.InfoS("backup secret", "secret", secret) + cert, err := data.RemoteClient().CoreV1().Secrets(vc.GetNamespace()).Get(context.TODO(), secret, metav1.GetOptions{}) + if err != nil { + return err + } + + err = SaveRuntimeObjectToYAML(cert, secret, dirName) + + if err != nil { + return fmt.Errorf("write backup file failed: %s", err.Error()) + } + + klog.InfoS("backup secret successed", "file", secret) + } + + return nil +} + +func RunReCreateCertAndKubeConfig(data *Option) error { + klog.Infof("update ca.crt and kubeconfig for virtual cluster") + exec, err := controller.UpdateCertPhase(data.VirtualCluster(), data.Client, data.restConfig, &v1alpha1.KubeNestConfiguration{}) + if err != nil { + return err + } + + return exec.Execute() +} + +func UpdateKubeProxyConfig(data *Option) error { + klog.Infof("update kube-proxy config map") + configCert, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), util.GetAdminConfigSecretName(data.GetName()), metav1.GetOptions{}) + if err != nil { + return err + } + + kubeconfigstring := string(configCert.Data[constants.KubeConfig]) + + vc := data.VirtualCluster() + + k8sClient, err := util.GenerateKubeclient(vc) + if err != nil { + return err + } + + kubeproxycm, err := k8sClient.CoreV1().ConfigMaps("kube-system").Get(context.TODO(), "kube-proxy", metav1.GetOptions{}) + if err != nil { + return err + } + + kubeproxycmkey := constants.KubeConfig + ".conf" + + kubeproxycm.Data[kubeproxycmkey] = kubeconfigstring + + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + kubeproxycm.ResourceVersion = "" + _, err = k8sClient.CoreV1().ConfigMaps("kube-system").Update(context.TODO(), kubeproxycm, metav1.UpdateOptions{}) + return err + }) + + if err != nil { + return err + } + + klog.Infof("save files to disk") + // save to dir + dirName := BackDir(vc) + err = SaveStringToDir(kubeconfigstring, "kubeconfig.conf", dirName) + + if err != nil { + return fmt.Errorf("write backup file failed: %s", err.Error()) + } + // update VC + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + currentVC, err := data.KosmosClient().KosmosV1alpha1().VirtualClusters(vc.GetNamespace()).Get(context.TODO(), vc.GetName(), metav1.GetOptions{}) + if err != nil { + return err + } + currentVC.Spec.Kubeconfig = base64.StdEncoding.EncodeToString([]byte(kubeconfigstring)) + _, err = data.KosmosClient().KosmosV1alpha1().VirtualClusters(vc.GetNamespace()).Update(context.TODO(), currentVC, metav1.UpdateOptions{}) + // nessary to update the cache + data.UpdateVirtualCluster(currentVC) + return err + }) + + // get ca.crt + cert, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), util.GetCertName(data.GetName()), metav1.GetOptions{}) + if err != nil { + return err + } + + cacrt := cert.Data[constants.CaCertAndKeyName+".crt"] + + err = SaveStringToDir(string(cacrt), "ca.crt", dirName) + if err != nil { + return fmt.Errorf("write backup file failed: %s", err.Error()) + } + + return nil +} + +func RestartVirtualControlPlanePod(data *Option) error { + klog.Infof("restart control-plane pod in host cluster") + vc := data.VirtualCluster() + + namespace := vc.GetNamespace() + name := vc.GetName() + commands := [][]string{ + { + "--kubeconfig", + HostClusterConfigPath(), + "-n", namespace, + "rollout", + "restart", + fmt.Sprintf("statefulset.apps/%s-etcd", name), + fmt.Sprintf("deployment.apps/%s-apiserver", name), + fmt.Sprintf("deployment.apps/%s-kube-controller-manager", name), + fmt.Sprintf("deployment.apps/%s-virtualcluster-scheduler", name), + fmt.Sprintf("deployment.apps/%s-coredns", name), + }, + } + + for _, args := range commands { + klog.InfoS("run command:", strings.Join(args, " ")) + if err := runKubectlCommand(args...); err != nil { + // Sometimes an exception occurs: Error from server (NotFound): deployments.apps "example0318-coredns" not found. + // This is not a problem, because the coredns is deployed in the virtual cluster. + klog.InfoS("run command failed:", err) + } + } + + // wait for pod ready + return WaitPodReady(data.RemoteClient(), namespace) +} + +func RestartVirtualPod(data *Option) error { + klog.Infof("restart pod in virtual cluster") + vc := data.VirtualCluster() + + dirName := BackDir(vc) + commands := [][]string{ + { + "--kubeconfig", + fmt.Sprintf("./%s/kubeconfig.conf", dirName), + "-n", + "kube-system", + "rollout", + "restart", + "deployment.apps/calico-typha", + "deployment.apps/calico-kube-controllers", + "daemonset.apps/calico-node", + "daemonset.apps/kube-proxy", + "daemonset.apps/konnectivity-agent", + }, + } + + for _, args := range commands { + klog.InfoS("run command:", strings.Join(args, " ")) + if err := runKubectlCommand(args...); err != nil { + klog.InfoS("run command failed:", err) + } + } + + k8sClient, err := util.GenerateKubeclient(vc) + if err != nil { + return err + } + + return WaitDaemonsetReady(k8sClient, "kube-system", "konnectivity-agent") +} + +func RestartVirtualWorkerKubelet(data *Option) error { + vc := data.VirtualCluster() + + klog.Infof("get ips of node-agent") + nodeIPs, err := GetVirtualNodeIP(data.KosmosClient(), vc) + if err != nil { + return err + } + + klog.Infof("send ca.crt and kubelet.conf to node and run cert.sh") + for _, nodeIP := range nodeIPs { + // back dir + dirName := BackDir(vc) + + exectHelper := exector.NewExectorHelper(nodeIP, "") + + // upload shell + scpShellCmd := &exector.SCPExector{ + DstFilePath: ".", + DstFileName: "cert.sh", + SrcByte: []byte(certShell), + } + + ret := exectHelper.DoExector(context.TODO().Done(), scpShellCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("scp shell to node %s failed: %s", nodeIP, ret.String()) + } + + // upload ca.crt + scpCrtCmd := &exector.SCPExector{ + DstFilePath: "/apps/conf/kosmos/cert/", + DstFileName: "ca.crt", + SrcFile: fmt.Sprintf("./%s/%s", dirName, "ca.crt"), + } + + ret = exectHelper.DoExector(context.TODO().Done(), scpCrtCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("scp ca.crt to node %s failed: %s", nodeIP, ret.String()) + } + + // upload kubeconfig + scpKubeconfiCmd := &exector.SCPExector{ + DstFilePath: "/apps/conf/kosmos/cert/", + DstFileName: "kubelet.conf", + SrcFile: fmt.Sprintf("./%s/%s", dirName, "kubeconfig.conf"), + } + + ret = exectHelper.DoExector(context.TODO().Done(), scpKubeconfiCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("scp kubeconfi to node %s failed: %s", nodeIP, ret.String()) + } + + updateCmd := &exector.CMDExector{ + Cmd: "bash cert.sh update", + } + + ret = exectHelper.DoExector(context.TODO().Done(), updateCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("do update shell on node %s failed: %s", nodeIP, ret.String()) + } + } + + k8sClient, err := util.GenerateKubeclient(vc) + if err != nil { + return err + } + + return WaitNodeReady(k8sClient) +} + +func RunCheckEnvironment(data *Option) error { + klog.Infof("try to connect to node-agent") + vc := data.VirtualCluster() + nodeIPs, err := GetVirtualNodeIP(data.KosmosClient(), vc) + if err != nil { + return err + } + for _, nodeIP := range nodeIPs { + exectHelper := exector.NewExectorHelper(nodeIP, "") + + testCmd := &exector.CMDExector{ + Cmd: "pwd", + } + + ret := exectHelper.DoExector(context.TODO().Done(), testCmd) + if ret.Status != exector.SUCCESS { + return fmt.Errorf("do check on node %s failed: %s", nodeIP, ret.String()) + } + } + klog.Infof("try to run command kubectl") + namespace := vc.GetNamespace() + name := vc.GetName() + commands := [][]string{ + { + "--kubeconfig", + HostClusterConfigPath(), + "-n", namespace, + "get", + "vc", + name, + }, + } + + for _, args := range commands { + klog.InfoS("run command:", strings.Join(args, " ")) + if err := runKubectlCommand(args...); err != nil { + klog.InfoS("run command failed:", err) + return err + } + } + + return nil +} diff --git a/pkg/kosmosctl/cert/util.go b/pkg/kosmosctl/cert/util.go new file mode 100644 index 000000000..e83808a29 --- /dev/null +++ b/pkg/kosmosctl/cert/util.go @@ -0,0 +1,203 @@ +package cert + +import ( + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +func BackDir(vc *v1alpha1.VirtualCluster) string { + // back dir + return fmt.Sprintf("backup-%s-%s", vc.GetNamespace(), vc.GetName()) +} + +func SaveRuntimeObjectToYAML(obj runtime.Object, fileName, dirName string) error { + scheme := runtime.NewScheme() + serializer := json.NewSerializerWithOptions(json.DefaultMetaFactory, scheme, scheme, json.SerializerOptions{Yaml: true, Pretty: true}) + + filePath := filepath.Join(dirName, fmt.Sprintf("%s.yaml", fileName)) + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + err = serializer.Encode(obj, file) + if err != nil { + return fmt.Errorf("failed to serialize secret to YAML: %w", err) + } + + return nil +} + +// nolint +func SaveStringToDir(data string, fileName, dirName string) error { + backupFilePath := filepath.Join(dirName, fileName) + err := os.WriteFile(backupFilePath, []byte(data), 0644) + if err != nil { + return fmt.Errorf("write backup file failed: %s", err.Error()) + } + + return err +} + +func HostClusterConfigPath() string { + kubeconfigPath := os.Getenv("KUBECONFIG") + if len(kubeconfigPath) == 0 { + return "~/.kube/config" + } + return kubeconfigPath +} + +func runKubectlCommand(args ...string) error { + cmd := exec.Command("kubectl", args...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("exec cmd failed: %s\n err: %v", string(output), err) + } + fmt.Println(string(output)) + return nil +} + +func WaitPodReady(k8sClient kubernetes.Interface, namespace string) error { + timeout := constants.WaitCorePodsRunningTimeout + + endTime := time.Now().Second() + int(timeout.Seconds()) + + startTime := time.Now().Second() + if startTime > endTime { + return errors.New("Timeout waiting for all pods running") + } + klog.Infof("Check if all pods ready in namespace %s", namespace) + err := wait.PollWithContext(context.TODO(), 5*time.Second, time.Duration(endTime-startTime)*time.Second, func(ctx context.Context) (done bool, err error) { + klog.Infof("Check if virtualcluster %s all deployments ready in namespace %s", namespace) + deployList, err := k8sClient.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, errors.Wrapf(err, "Get deployment list in namespace %s error", namespace) + } + for _, deploy := range deployList.Items { + if deploy.Status.AvailableReplicas != deploy.Status.Replicas { + klog.Infof("Deployment %s/%s is not ready yet. Available replicas: %d, Desired: %d. Waiting...", deploy.Name, namespace, deploy.Status.AvailableReplicas, deploy.Status.Replicas) + return false, nil + } + } + + klog.Infof("Check if virtualcluster %s all statefulset ready in namespace %s", namespace) + stsList, err := k8sClient.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, errors.Wrapf(err, "Get statefulset list in namespace %s error", namespace) + } + for _, sts := range stsList.Items { + if sts.Status.AvailableReplicas != sts.Status.Replicas { + klog.Infof("Statefulset %s/%s is not ready yet. Available replicas: %d, Desired: %d. Waiting...", sts.Name, namespace, sts.Status.AvailableReplicas, sts.Status.Replicas) + return false, nil + } + } + + klog.Infof("Check if virtualcluster %s all daemonset ready in namespace %s", namespace) + damonsetList, err := k8sClient.AppsV1().DaemonSets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, errors.Wrapf(err, "Get daemonset list in namespace %s error", namespace) + } + for _, daemonset := range damonsetList.Items { + if daemonset.Status.CurrentNumberScheduled != daemonset.Status.NumberReady { + klog.Infof("Daemonset %s/%s is not ready yet. Scheduled replicas: %d, Ready: %d. Waiting...", daemonset.Name, namespace, daemonset.Status.CurrentNumberScheduled, daemonset.Status.NumberReady) + return false, nil + } + } + + return true, nil + }) + return err +} + +func WaitNodeReady(k8sClient kubernetes.Interface) error { + // wait all node ready + timeout := constants.WaitCorePodsRunningTimeout + + endTime := time.Now().Second() + int(timeout.Seconds()) + + startTime := time.Now().Second() + if startTime > endTime { + return errors.New("Timeout waiting for all pods running") + } + klog.Infof("Check if all node ready") + err := wait.PollWithContext(context.TODO(), 5*time.Second, time.Duration(endTime-startTime)*time.Second, func(ctx context.Context) (done bool, err error) { + nodes, err := k8sClient.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + + for _, node := range nodes.Items { + if !util.IsNodeReady(node.Status.Conditions) { + return false, nil + } + } + return true, nil + }) + return err +} + +func WaitDaemonsetReady(k8sClient kubernetes.Interface, namespace, name string) error { + timeout := constants.WaitCorePodsRunningTimeout + + endTime := time.Now().Second() + int(timeout.Seconds()) + + startTime := time.Now().Second() + if startTime > endTime { + return errors.New("Timeout waiting for all pods running") + } + klog.Infof("Check if all pods ready in namespace %s", namespace) + err := wait.PollWithContext(context.TODO(), 5*time.Second, time.Duration(endTime-startTime)*time.Second, func(ctx context.Context) (done bool, err error) { + klog.Infof("Check if virtualcluster %s all daemonset ready in namespace %s", namespace) + damonsetList, err := k8sClient.AppsV1().DaemonSets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return false, errors.Wrapf(err, "Get daemonset list in namespace %s error", namespace) + } + for _, daemonset := range damonsetList.Items { + if daemonset.GetName() != name { + continue + } + if daemonset.Status.CurrentNumberScheduled != daemonset.Status.NumberReady { + klog.Infof("Daemonset %s/%s is not ready yet. Scheduled replicas: %d, Ready: %d. Waiting...", daemonset.Name, namespace, daemonset.Status.CurrentNumberScheduled, daemonset.Status.NumberReady) + return false, nil + } + } + + return true, nil + }) + return err +} + +func GetVirtualNodeIP(kosmosClient versioned.Interface, vc *v1alpha1.VirtualCluster) ([]string, error) { + globalNodeList, err := kosmosClient.KosmosV1alpha1().GlobalNodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("list global nodes: %w", err) + } + nodeIPs := []string{} + nodeInfos := vc.Spec.PromoteResources.NodeInfos + for _, node := range globalNodeList.Items { + for _, nodeInfo := range nodeInfos { + if node.GetName() == nodeInfo.NodeName { + nodeIPs = append(nodeIPs, node.Spec.NodeIP) + } + } + } + return nodeIPs, nil +} diff --git a/pkg/kubenest/controller/global.node.controller/global_node_controller.go b/pkg/kubenest/controller/global.node.controller/global_node_controller.go index d603cd285..d29b03ccf 100644 --- a/pkg/kubenest/controller/global.node.controller/global_node_controller.go +++ b/pkg/kubenest/controller/global.node.controller/global_node_controller.go @@ -26,7 +26,7 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" ) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go deleted file mode 100644 index 44c052b89..000000000 --- a/pkg/kubenest/controller/virtualcluster.node.controller/node_controller.go +++ /dev/null @@ -1,386 +0,0 @@ -package vcnodecontroller - -import ( - "context" - "fmt" - "sync" - - "github.com/pkg/errors" - v1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/retry" - "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" - "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" - "github.com/kosmos.io/kosmos/pkg/kubenest/util" - "github.com/kosmos.io/kosmos/pkg/utils" -) - -type NodeController struct { - client.Client - RootClientSet kubernetes.Interface - EventRecorder record.EventRecorder - KosmosClient versioned.Interface - Options *v1alpha1.KubeNestConfiguration - sem chan struct{} -} - -func NewNodeController(client client.Client, RootClientSet kubernetes.Interface, EventRecorder record.EventRecorder, KosmosClient versioned.Interface, options *v1alpha1.KubeNestConfiguration) *NodeController { - r := NodeController{ - Client: client, - RootClientSet: RootClientSet, - EventRecorder: EventRecorder, - KosmosClient: KosmosClient, - Options: options, - sem: make(chan struct{}, env.GetNodeTaskMaxGoroutines()), - } - return &r -} - -func (r *NodeController) SetupWithManager(mgr manager.Manager) error { - if r.Client == nil { - r.Client = mgr.GetClient() - } - - skipEvent := func(_ client.Object) bool { - return true - } - - return ctrl.NewControllerManagedBy(mgr). - Named(constants.NodeControllerName). - WithOptions(controller.Options{}). - For(&v1alpha1.VirtualCluster{}, builder.WithPredicates(predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - return skipEvent(createEvent.Object) - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - return skipEvent(updateEvent.ObjectNew) - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - return skipEvent(deleteEvent.Object) - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return skipEvent(genericEvent.Object) - }, - })). - Complete(r) -} - -func hasItemInArray(name string, f func(string) bool) bool { - return f(name) -} - -func (r *NodeController) compareAndTranformNodes(ctx context.Context, targetNodes []v1alpha1.NodeInfo, actualNodes []v1.Node) ([]v1alpha1.GlobalNode, []v1alpha1.GlobalNode, error) { - unjoinNodes := make([]v1alpha1.GlobalNode, 0) - joinNodes := make([]v1alpha1.GlobalNode, 0) - - globalNodes := &v1alpha1.GlobalNodeList{} - if err := r.Client.List(ctx, globalNodes); err != nil { - return nil, nil, fmt.Errorf("failed to list global nodes: %v", err) - } - - // cacheMap := map[string]string{} - for _, targetNode := range targetNodes { - has := hasItemInArray(targetNode.NodeName, func(name string) bool { - for _, actualNode := range actualNodes { - if actualNode.Name == name { - return true - } - } - return false - }) - - if !has { - globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) - if !ok { - return nil, nil, fmt.Errorf("global node %s not found", targetNode.NodeName) - } - joinNodes = append(joinNodes, *globalNode) - } - } - - for _, actualNode := range actualNodes { - has := hasItemInArray(actualNode.Name, func(name string) bool { - for _, targetNode := range targetNodes { - if targetNode.NodeName == name { - return true - } - } - return false - }) - - if !has { - globalNode, ok := util.FindGlobalNode(actualNode.Name, globalNodes.Items) - if !ok { - return nil, nil, fmt.Errorf("global node %s not found", actualNode.Name) - } - unjoinNodes = append(unjoinNodes, *globalNode) - } - } - - return unjoinNodes, joinNodes, nil -} - -func (r *NodeController) UpdateVirtualClusterStatus(ctx context.Context, virtualCluster v1alpha1.VirtualCluster, status v1alpha1.Phase, reason string) error { - retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - var targetObj v1alpha1.VirtualCluster - if err := r.Get(ctx, types.NamespacedName{Name: virtualCluster.Name, Namespace: virtualCluster.Namespace}, &targetObj); err != nil { - klog.Warningf("get target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err) - return err - } - updateVirtualCluster := targetObj.DeepCopy() - if len(status) > 0 { - updateVirtualCluster.Status.Phase = status - } - updateVirtualCluster.Status.Reason = reason - updateTime := metav1.Now() - updateVirtualCluster.Status.UpdateTime = &updateTime - if _, err := r.KosmosClient.KosmosV1alpha1().VirtualClusters(updateVirtualCluster.Namespace).Update(ctx, updateVirtualCluster, metav1.UpdateOptions{}); err != nil && !apierrors.IsNotFound(err) { - klog.Warningf("update target virtualcluster %s namespace %s failed: %v", virtualCluster.Name, virtualCluster.Namespace, err) - return err - } - return nil - }) - - if retryErr != nil { - return fmt.Errorf("update virtualcluster %s status namespace %s failed: %s", virtualCluster.Name, virtualCluster.Namespace, retryErr) - } - - r.EventRecorder.Event(&virtualCluster, v1.EventTypeWarning, "VCStatusPending", fmt.Sprintf("Name: %s, Namespace: %s, reason: %s", virtualCluster.Name, virtualCluster.Namespace, reason)) - - return nil -} - -func (r *NodeController) DoNodeTask(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { - k8sClient, err := util.GenerateKubeclient(&virtualCluster) - if err != nil { - return fmt.Errorf("virtualcluster %s crd kubernetes client failed: %v", virtualCluster.Name, err) - } - - nodes, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) - if err != nil { - return fmt.Errorf("virtualcluster %s get virtual-cluster nodes list failed: %v", virtualCluster.Name, err) - } - - // compare cr and actual nodes in k8s - unjoinNodes, joinNodes, err := r.compareAndTranformNodes(ctx, virtualCluster.Spec.PromoteResources.NodeInfos, nodes.Items) - if err != nil { - return fmt.Errorf("compare cr and actual nodes failed, virtual-cluster-name: %v, err: %s", virtualCluster.Name, err) - } - - if len(unjoinNodes) > 0 || len(joinNodes) > 0 { - if virtualCluster.Status.Phase != v1alpha1.Initialized && virtualCluster.Status.Phase != v1alpha1.Deleting { - if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Updating, "node task"); err != nil { - return err - } - } - } - if len(unjoinNodes) > 0 { - // unjoin node - if err := r.unjoinNode(ctx, unjoinNodes, virtualCluster, k8sClient); err != nil { - return fmt.Errorf("virtualcluster %s unjoin node failed: %v", virtualCluster.Name, err) - } - } - if len(joinNodes) > 0 { - // join node - if err := r.joinNode(ctx, joinNodes, virtualCluster, k8sClient); err != nil { - return fmt.Errorf("virtualcluster %s join node failed: %v", virtualCluster.Name, err) - } - } - - if len(unjoinNodes) > 0 || len(joinNodes) > 0 { - newStatus := v1alpha1.AllNodeReady - if virtualCluster.Status.Phase == v1alpha1.Deleting { - newStatus = v1alpha1.AllNodeDeleted - } - if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, newStatus, "node ready"); err != nil { - return err - } - } - return nil -} - -func (r *NodeController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { - klog.V(4).Infof("============ virtual-cluster-node-controller start to reconcile %s ============", request.NamespacedName) - defer klog.V(4).Infof("============ virtual-cluster-node-controller finish to reconcile %s ============", request.NamespacedName) - - // check virtual cluster nodes - var virtualCluster v1alpha1.VirtualCluster - if err := r.Get(ctx, request.NamespacedName, &virtualCluster); err != nil { - if apierrors.IsNotFound(err) { - klog.V(4).Infof("virtual-cluster-node-controller: can not found %s", request.NamespacedName) - return reconcile.Result{}, nil - } - klog.Errorf("get clusternode %s error: %v", request.NamespacedName, err) - if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { - klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) - } - return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil - } - - if !virtualCluster.GetDeletionTimestamp().IsZero() && virtualCluster.Status.Phase != v1alpha1.Deleting { - klog.V(4).Info("virtualcluster %s is deleting, skip node controller", virtualCluster.Name) - return reconcile.Result{}, nil - } - - if !virtualCluster.GetDeletionTimestamp().IsZero() && len(virtualCluster.Spec.Kubeconfig) == 0 { - if err := r.DoNodeClean(ctx, virtualCluster); err != nil { - klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err) - if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { - klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) - } - return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil - } - return reconcile.Result{}, nil - } - - if virtualCluster.Status.Phase == v1alpha1.Preparing { - klog.V(4).Infof("virtualcluster wait cluster ready, cluster name: %s", virtualCluster.Name) - return reconcile.Result{}, nil - } - - if virtualCluster.Status.Phase == v1alpha1.Pending { - klog.V(4).Infof("virtualcluster is pending, cluster name: %s", virtualCluster.Name) - return reconcile.Result{}, nil - } - - if len(virtualCluster.Spec.Kubeconfig) == 0 { - klog.Warning("virtualcluster.spec.kubeconfig is nil, wait virtualcluster control-plane ready.") - return reconcile.Result{}, nil - } - - if err := r.DoNodeTask(ctx, virtualCluster); err != nil { - klog.Errorf("virtualcluster %s do node task failed: %v", virtualCluster.Name, err) - if err := r.UpdateVirtualClusterStatus(ctx, virtualCluster, v1alpha1.Pending, err.Error()); err != nil { - klog.Errorf("update virtualcluster %s status error: %v", request.NamespacedName, err) - } - return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, nil - } - - return reconcile.Result{}, nil -} - -func (r *NodeController) DoNodeClean(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { - targetNodes := virtualCluster.Spec.PromoteResources.NodeInfos - globalNodes := &v1alpha1.GlobalNodeList{} - - if err := r.Client.List(ctx, globalNodes); err != nil { - return fmt.Errorf("failed to list global nodes: %v", err) - } - - cleanNodeInfos := []v1alpha1.GlobalNode{} - - for _, targetNode := range targetNodes { - globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) - if !ok { - return fmt.Errorf("global node %s not found", targetNode.NodeName) - } - cleanNodeInfos = append(cleanNodeInfos, *globalNode) - } - - return r.cleanGlobalNode(ctx, cleanNodeInfos, virtualCluster, nil) -} - -func (r *NodeController) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error { - return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { - return workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ - NodeInfo: nodeInfo, - VirtualCluster: virtualCluster, - HostClient: r.Client, - HostK8sClient: r.RootClientSet, - Opt: r.Options, - }) - }) -} - -func (r *NodeController) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { - if len(nodeInfos) == 0 { - return nil - } - - clusterDNS := "" - dnssvc, err := k8sClient.CoreV1().Services(constants.SystemNs).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("get kube-dns service failed: %s", err) - } - clusterDNS = dnssvc.Spec.ClusterIP - - return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { - return workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{ - NodeInfo: nodeInfo, - VirtualCluster: virtualCluster, - KubeDNSAddress: clusterDNS, - HostClient: r.Client, - HostK8sClient: r.RootClientSet, - VirtualK8sClient: k8sClient, - Opt: r.Options, - }) - }) -} - -func (r *NodeController) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { - return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { - return workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ - NodeInfo: nodeInfo, - VirtualCluster: virtualCluster, - HostClient: r.Client, - HostK8sClient: r.RootClientSet, - VirtualK8sClient: k8sClient, - Opt: r.Options, - }) - }) -} - -func (r *NodeController) BatchProcessNodes(nodeInfos []v1alpha1.GlobalNode, f func(v1alpha1.GlobalNode) error) error { - var wg sync.WaitGroup - errChan := make(chan error, len(nodeInfos)) - - for _, nodeInfo := range nodeInfos { - wg.Add(1) - r.sem <- struct{}{} - go func(nodeInfo v1alpha1.GlobalNode) { - defer wg.Done() - defer func() { <-r.sem }() - if err := f(nodeInfo); err != nil { - errChan <- fmt.Errorf("[%s] batchprocessnodes failed: %s", nodeInfo.Name, err) - } - }(nodeInfo) - } - - wg.Wait() - close(errChan) - - var taskErr error - for err := range errChan { - if err != nil { - if taskErr == nil { - taskErr = err - } else { - taskErr = errors.Wrap(err, taskErr.Error()) - } - } - } - - if taskErr != nil { - return taskErr - } - - return nil -} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/env/env.go b/pkg/kubenest/controller/virtualcluster.node.manager/env/env.go similarity index 100% rename from pkg/kubenest/controller/virtualcluster.node.controller/env/env.go rename to pkg/kubenest/controller/virtualcluster.node.manager/env/env.go diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go b/pkg/kubenest/controller/virtualcluster.node.manager/exector/exector.go similarity index 99% rename from pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go rename to pkg/kubenest/controller/virtualcluster.node.manager/exector/exector.go index 3df3767e8..f8728f156 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/exector/exector.go +++ b/pkg/kubenest/controller/virtualcluster.node.manager/exector/exector.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/websocket" "k8s.io/klog/v2" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" "github.com/kosmos.io/kosmos/pkg/utils" ) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_check.go b/pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_check.go similarity index 100% rename from pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_check.go rename to pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_check.go diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_cmd.go b/pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_cmd.go similarity index 96% rename from pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_cmd.go rename to pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_cmd.go index 98a9aed2b..f6585e163 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_cmd.go +++ b/pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_cmd.go @@ -6,7 +6,7 @@ import ( "github.com/gorilla/websocket" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" ) type CMDExector struct { diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go b/pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_scp.go similarity index 100% rename from pkg/kubenest/controller/virtualcluster.node.controller/exector/remote_scp.go rename to pkg/kubenest/controller/virtualcluster.node.manager/exector/remote_scp.go diff --git a/pkg/kubenest/controller/virtualcluster.node.manager/node_controller.go b/pkg/kubenest/controller/virtualcluster.node.manager/node_controller.go new file mode 100644 index 000000000..307ccd9d5 --- /dev/null +++ b/pkg/kubenest/controller/virtualcluster.node.manager/node_controller.go @@ -0,0 +1,254 @@ +package vcnodemanager + +import ( + "context" + "fmt" + "sync" + + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" + "github.com/kosmos.io/kosmos/pkg/kubenest/constants" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/workflow" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/workflow/task" + "github.com/kosmos.io/kosmos/pkg/kubenest/util" +) + +type NodeManager struct { + client.Client + RootClientSet kubernetes.Interface + Options *v1alpha1.KubeNestConfiguration + sem chan struct{} +} + +func NewNodeManager(client client.Client, RootClientSet kubernetes.Interface, options *v1alpha1.KubeNestConfiguration) *NodeManager { + r := NodeManager{ + Client: client, + RootClientSet: RootClientSet, + Options: options, + sem: make(chan struct{}, env.GetNodeTaskMaxGoroutines()), + } + return &r +} + +func hasItemInArray(name string, f func(string) bool) bool { + return f(name) +} + +func (r *NodeManager) compareAndTranformNodes(ctx context.Context, targetNodes []v1alpha1.NodeInfo, actualNodes []v1.Node) ([]v1alpha1.GlobalNode, []v1alpha1.GlobalNode, error) { + unjoinNodes := make([]v1alpha1.GlobalNode, 0) + joinNodes := make([]v1alpha1.GlobalNode, 0) + + globalNodes := &v1alpha1.GlobalNodeList{} + if err := r.Client.List(ctx, globalNodes); err != nil { + return nil, nil, fmt.Errorf("failed to list global nodes: %v", err) + } + + // cacheMap := map[string]string{} + for _, targetNode := range targetNodes { + has := hasItemInArray(targetNode.NodeName, func(name string) bool { + for _, actualNode := range actualNodes { + if actualNode.Name == name { + return true + } + } + return false + }) + + if !has { + globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) + if !ok { + return nil, nil, fmt.Errorf("global node %s not found", targetNode.NodeName) + } + joinNodes = append(joinNodes, *globalNode) + } + } + + for _, actualNode := range actualNodes { + has := hasItemInArray(actualNode.Name, func(name string) bool { + for _, targetNode := range targetNodes { + if targetNode.NodeName == name { + return true + } + } + return false + }) + + if !has { + globalNode, ok := util.FindGlobalNode(actualNode.Name, globalNodes.Items) + if !ok { + return nil, nil, fmt.Errorf("global node %s not found", actualNode.Name) + } + unjoinNodes = append(unjoinNodes, *globalNode) + } + } + + return unjoinNodes, joinNodes, nil +} + +func (r *NodeManager) DoNodeTask(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { + k8sClient, err := util.GenerateKubeclient(&virtualCluster) + if err != nil { + return fmt.Errorf("virtualcluster %s crd kubernetes client failed: %v", virtualCluster.Name, err) + } + + nodes, err := k8sClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("virtualcluster %s get virtual-cluster nodes list failed: %v", virtualCluster.Name, err) + } + + // compare cr and actual nodes in k8s + unjoinNodes, joinNodes, err := r.compareAndTranformNodes(ctx, virtualCluster.Spec.PromoteResources.NodeInfos, nodes.Items) + if err != nil { + return fmt.Errorf("compare cr and actual nodes failed, virtual-cluster-name: %v, err: %s", virtualCluster.Name, err) + } + + if len(unjoinNodes) > 0 { + // unjoin node + if err := r.unjoinNode(ctx, unjoinNodes, virtualCluster, k8sClient); err != nil { + return fmt.Errorf("virtualcluster %s unjoin node failed: %v", virtualCluster.Name, err) + } + } + if len(joinNodes) > 0 { + // join node + if err := r.joinNode(ctx, joinNodes, virtualCluster, k8sClient); err != nil { + return fmt.Errorf("virtualcluster %s join node failed: %v", virtualCluster.Name, err) + } + } + + return nil +} +func (r *NodeManager) NodeDelete(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { + if err := r.DoNodeClean(ctx, virtualCluster); err != nil { + klog.Errorf("virtualcluster %s do node clean failed: %v", virtualCluster.Name, err) + return err + } + return nil +} + +func (r *NodeManager) NodeUpdate(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { + if len(virtualCluster.Spec.Kubeconfig) == 0 { + klog.Warning("virtualcluster.spec.kubeconfig is nil, wait virtualcluster control-plane ready.") + return fmt.Errorf("virtualcluster.spec.kubeconfig is nil, wait virtualcluster control-plane ready. %s", virtualCluster.GetName()) + } + + if err := r.DoNodeTask(ctx, virtualCluster); err != nil { + klog.Errorf("virtualcluster %s do node task failed: %v", virtualCluster.Name, err) + return fmt.Errorf("update virtualcluster %s status error: %v", virtualCluster.GetName(), err) + } + return nil +} + +func (r *NodeManager) DoNodeClean(ctx context.Context, virtualCluster v1alpha1.VirtualCluster) error { + targetNodes := virtualCluster.Spec.PromoteResources.NodeInfos + globalNodes := &v1alpha1.GlobalNodeList{} + + if err := r.Client.List(ctx, globalNodes); err != nil { + return fmt.Errorf("failed to list global nodes: %v", err) + } + + cleanNodeInfos := []v1alpha1.GlobalNode{} + + for _, targetNode := range targetNodes { + globalNode, ok := util.FindGlobalNode(targetNode.NodeName, globalNodes.Items) + if !ok { + return fmt.Errorf("global node %s not found", targetNode.NodeName) + } + cleanNodeInfos = append(cleanNodeInfos, *globalNode) + } + + return r.cleanGlobalNode(ctx, cleanNodeInfos, virtualCluster, nil) +} + +func (r *NodeManager) cleanGlobalNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, _ kubernetes.Interface) error { + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewCleanNodeWorkFlow().RunTask(ctx, task.TaskOpt{ + NodeInfo: nodeInfo, + VirtualCluster: virtualCluster, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, + Opt: r.Options, + }) + }) +} + +func (r *NodeManager) joinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { + if len(nodeInfos) == 0 { + return nil + } + + clusterDNS := "" + dnssvc, err := k8sClient.CoreV1().Services(constants.SystemNs).Get(ctx, constants.KubeDNSSVCName, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("get kube-dns service failed: %s", err) + } + clusterDNS = dnssvc.Spec.ClusterIP + + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewJoinWorkFlow().RunTask(ctx, task.TaskOpt{ + NodeInfo: nodeInfo, + VirtualCluster: virtualCluster, + KubeDNSAddress: clusterDNS, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, + VirtualK8sClient: k8sClient, + Opt: r.Options, + }) + }) +} + +func (r *NodeManager) unjoinNode(ctx context.Context, nodeInfos []v1alpha1.GlobalNode, virtualCluster v1alpha1.VirtualCluster, k8sClient kubernetes.Interface) error { + return r.BatchProcessNodes(nodeInfos, func(nodeInfo v1alpha1.GlobalNode) error { + return workflow.NewUnjoinWorkFlow().RunTask(ctx, task.TaskOpt{ + NodeInfo: nodeInfo, + VirtualCluster: virtualCluster, + HostClient: r.Client, + HostK8sClient: r.RootClientSet, + VirtualK8sClient: k8sClient, + Opt: r.Options, + }) + }) +} + +func (r *NodeManager) BatchProcessNodes(nodeInfos []v1alpha1.GlobalNode, f func(v1alpha1.GlobalNode) error) error { + var wg sync.WaitGroup + errChan := make(chan error, len(nodeInfos)) + + for _, nodeInfo := range nodeInfos { + wg.Add(1) + r.sem <- struct{}{} + go func(nodeInfo v1alpha1.GlobalNode) { + defer wg.Done() + defer func() { <-r.sem }() + if err := f(nodeInfo); err != nil { + errChan <- fmt.Errorf("[%s] batchprocessnodes failed: %s", nodeInfo.Name, err) + } + }(nodeInfo) + } + + wg.Wait() + close(errChan) + + var taskErr error + for err := range errChan { + if err != nil { + if taskErr == nil { + taskErr = err + } else { + taskErr = errors.Wrap(err, taskErr.Error()) + } + } + } + + if taskErr != nil { + return taskErr + } + + return nil +} diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/logger.go b/pkg/kubenest/controller/virtualcluster.node.manager/workflow/task/logger.go similarity index 100% rename from pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/logger.go rename to pkg/kubenest/controller/virtualcluster.node.manager/workflow/task/logger.go diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go b/pkg/kubenest/controller/virtualcluster.node.manager/workflow/task/task.go similarity index 99% rename from pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go rename to pkg/kubenest/controller/virtualcluster.node.manager/workflow/task/task.go index ff549b0e1..ed377b92d 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task/task.go +++ b/pkg/kubenest/controller/virtualcluster.node.manager/workflow/task/task.go @@ -19,8 +19,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/exector" "github.com/kosmos.io/kosmos/pkg/kubenest/util" ) diff --git a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go b/pkg/kubenest/controller/virtualcluster.node.manager/workflow/workflow.go similarity index 98% rename from pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go rename to pkg/kubenest/controller/virtualcluster.node.manager/workflow/workflow.go index e8eba31b5..2e4e7ddda 100644 --- a/pkg/kubenest/controller/virtualcluster.node.controller/workflow/workflow.go +++ b/pkg/kubenest/controller/virtualcluster.node.manager/workflow/workflow.go @@ -5,7 +5,7 @@ import ( "time" "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/workflow/task" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/workflow/task" ) const ( diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 81afd4f01..1c4048ad2 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -36,8 +36,9 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" + vcnodecontroller "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager" + env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/env" + "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.manager/exector" "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" "github.com/kosmos.io/kosmos/pkg/kubenest/util" apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" @@ -54,6 +55,7 @@ type VirtualClusterInitController struct { KubeNestOptions *v1alpha1.KubeNestConfiguration // CoreNamespaces is the namespaces of kubenest's core resources in vc cluster CoreNamespaces []string + nodeManager *vcnodecontroller.NodeManager } type NodePool struct { @@ -84,6 +86,27 @@ var nameMap = map[string]int{ "adminport": 4, } +func NewInitController( + client client.Client, + config *rest.Config, + eventRecorder record.EventRecorder, + rootClientSet kubernetes.Interface, + kosmosClient versioned.Interface, + kubeNestOptions *v1alpha1.KubeNestConfiguration, + coreNamespaces []string, +) *VirtualClusterInitController { + return &VirtualClusterInitController{ + Client: client, + Config: config, + EventRecorder: eventRecorder, + RootClientSet: rootClientSet, + KosmosClient: kosmosClient, + KubeNestOptions: kubeNestOptions, + CoreNamespaces: coreNamespaces, + nodeManager: vcnodecontroller.NewNodeManager(client, rootClientSet, kubeNestOptions), + } +} + func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { startTime := time.Now() klog.V(4).InfoS("Started syncing virtual cluster", "virtual cluster", request, "startTime", startTime) @@ -104,27 +127,49 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re //The object is being deleted if !originalCluster.DeletionTimestamp.IsZero() { - if len(updatedCluster.Spec.PromoteResources.NodeInfos) > 0 { - updatedCluster.Spec.PromoteResources.NodeInfos = nil - updatedCluster.Status.Phase = v1alpha1.Deleting + updatedCluster.Status.Phase = v1alpha1.Deleting + updatedCluster.Spec.PromoteResources.NodeInfos = nil + err := c.Update(updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + + if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { + updatedCluster.Status.Phase = v1alpha1.Pending + updatedCluster.Status.Reason = err.Error() err := c.Update(updatedCluster) if err != nil { - klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + klog.Errorf("Error delete virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error delete virtualcluster %s status", updatedCluster.Name) } - return reconcile.Result{}, nil + return reconcile.Result{}, err } - if updatedCluster.Status.Phase == v1alpha1.AllNodeDeleted { - err := c.destroyVirtualCluster(updatedCluster) - if err != nil { - klog.Errorf("Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) - return reconcile.Result{}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) - } - return c.removeFinalizer(updatedCluster) - } else if updatedCluster.Status.Phase == v1alpha1.Deleting { - klog.V(2).InfoS("Virtual Cluster is deleting, wait for event 'AllNodeDeleted'", "Virtual Cluster", request) - return reconcile.Result{}, nil + // if err = c.nodeManager.NodeDelete(ctx, *updatedCluster); err != nil { + // updatedCluster.Status.Phase = v1alpha1.Pending + // updatedCluster.Status.Reason = err.Error() + // err := c.Update(updatedCluster) + // if err != nil { + // klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + // return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + // } + // return reconcile.Result{}, err + // } + + klog.V(2).Infof(" all node is deleted, vc: %s", updatedCluster.Name) + + updatedCluster.Status.Phase = v1alpha1.AllNodeDeleted + err = c.Update(updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + + err = c.destroyVirtualCluster(updatedCluster) + if err != nil { + klog.Errorf("Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Destroy virtual cluter %s failed. err: %s", updatedCluster.Name, err.Error()) } return c.removeFinalizer(updatedCluster) } @@ -157,37 +202,19 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re klog.Errorf("Error update virtualcluster %s status to %s. %v", updatedCluster.Name, updatedCluster.Status.Phase, err) return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - case v1alpha1.AllNodeReady: - name, namespace := request.Name, request.Namespace - // check if the vc enable vip - if len(originalCluster.Status.VipMap) > 0 { - // label node for keepalived - vcClient, err := tasks.GetVcClientset(c.RootClientSet, name, namespace) - if err != nil { - klog.Errorf("Get vc client failed. err: %s", err.Error()) - return reconcile.Result{}, errors.Wrapf(err, "Get vc client failed. err: %s", err.Error()) - } - reps, err := c.labelNode(vcClient) + if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { + updatedCluster.Status.Phase = v1alpha1.Pending + updatedCluster.Status.Reason = err.Error() + err := c.Update(updatedCluster) if err != nil { - klog.Errorf("Label node for keepalived failed. err: %s", err.Error()) - return reconcile.Result{}, errors.Wrapf(err, "Label node for keepalived failed. err: %s", err.Error()) + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } - klog.V(2).Infof("Label %d node for keepalived", reps) + return reconcile.Result{}, err } + name, namespace := request.Name, request.Namespace + return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster) - err := c.ensureCorePodsRunning(updatedCluster, constants.WaitCorePodsRunningTimeout) - if err != nil { - klog.Errorf("Check all pods running err: %s", err.Error()) - updatedCluster.Status.Reason = err.Error() - updatedCluster.Status.Phase = v1alpha1.Pending - } else { - updatedCluster.Status.Phase = v1alpha1.Completed - } - err = c.Update(updatedCluster) - if err != nil { - klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) - return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) - } case v1alpha1.Completed: //update request, check if promotepolicy nodes increase or decrease. // only 2 scenarios matched update request with status 'completed'. @@ -210,6 +237,18 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } + if err = c.nodeManager.NodeUpdate(ctx, *updatedCluster); err != nil { + updatedCluster.Status.Phase = v1alpha1.Pending + updatedCluster.Status.Reason = err.Error() + err := c.Update(updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + return reconcile.Result{}, err + } + name, namespace := request.Name, request.Namespace + return reconcile.Result{}, c.DoAllNodeReadyCheck(name, namespace, originalCluster, updatedCluster) default: klog.Warningf("Skip virtualcluster %s reconcile status: %s", originalCluster.Name, originalCluster.Status.Phase) @@ -217,6 +256,39 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re return c.ensureFinalizer(updatedCluster) } +func (c *VirtualClusterInitController) DoAllNodeReadyCheck(name, namespace string, originalCluster, updatedCluster *v1alpha1.VirtualCluster) error { + // check if the vc enable vip + if len(originalCluster.Status.VipMap) > 0 { + // label node for keepalived + vcClient, err := tasks.GetVcClientset(c.RootClientSet, name, namespace) + if err != nil { + klog.Errorf("Get vc client failed. err: %s", err.Error()) + return errors.Wrapf(err, "Get vc client failed. err: %s", err.Error()) + } + reps, err := c.labelNode(vcClient) + if err != nil { + klog.Errorf("Label node for keepalived failed. err: %s", err.Error()) + return errors.Wrapf(err, "Label node for keepalived failed. err: %s", err.Error()) + } + klog.V(2).Infof("Label %d node for keepalived", reps) + } + + err := c.ensureCorePodsRunning(updatedCluster, constants.WaitCorePodsRunningTimeout) + if err != nil { + klog.Errorf("Check all pods running err: %s", err.Error()) + updatedCluster.Status.Reason = err.Error() + updatedCluster.Status.Phase = v1alpha1.Pending + } else { + updatedCluster.Status.Phase = v1alpha1.Completed + } + err = c.Update(updatedCluster) + if err != nil { + klog.Errorf("Error update virtualcluster %s status to %s", updatedCluster.Name, updatedCluster.Status.Phase) + return errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) + } + return nil +} + func (c *VirtualClusterInitController) SetupWithManager(mgr manager.Manager) error { return controllerruntime.NewControllerManagedBy(mgr). Named(constants.InitControllerName). @@ -254,18 +326,21 @@ func (c *VirtualClusterInitController) ensureFinalizer(virtualCluster *v1alpha1. if controllerutil.ContainsFinalizer(virtualCluster, VirtualClusterControllerFinalizer) { return reconcile.Result{}, nil } - current := &v1alpha1.VirtualCluster{} - if err := c.Client.Get(context.TODO(), types.NamespacedName{ - Namespace: virtualCluster.Namespace, - Name: virtualCluster.Name, - }, current); err != nil { - klog.Errorf("get virtualcluster %s error. %v", virtualCluster.Name, err) - return reconcile.Result{Requeue: true}, err - } - updated := current.DeepCopy() - controllerutil.AddFinalizer(updated, VirtualClusterControllerFinalizer) - err := c.Client.Update(context.TODO(), updated) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + current := &v1alpha1.VirtualCluster{} + if err := c.Client.Get(context.TODO(), types.NamespacedName{ + Namespace: virtualCluster.Namespace, + Name: virtualCluster.Name, + }, current); err != nil { + klog.Errorf("get virtualcluster %s error. %v", virtualCluster.Name, err) + return err + } + updated := current.DeepCopy() + controllerutil.AddFinalizer(updated, VirtualClusterControllerFinalizer) + return c.Client.Update(context.TODO(), updated) + }) + if err != nil { klog.Errorf("update virtualcluster %s error. %v", virtualCluster.Name, err) klog.Errorf("Failed to add finalizer to VirtualCluster %s/%s: %v", virtualCluster.Namespace, virtualCluster.Name, err) @@ -280,18 +355,20 @@ func (c *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1. return reconcile.Result{}, nil } - current := &v1alpha1.VirtualCluster{} - if err := c.Client.Get(context.TODO(), types.NamespacedName{ - Namespace: virtualCluster.Namespace, - Name: virtualCluster.Name, - }, current); err != nil { - klog.Errorf("get virtualcluster %s error. %v", virtualCluster.Name, err) - return reconcile.Result{Requeue: true}, err - } - updated := current.DeepCopy() + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + current := &v1alpha1.VirtualCluster{} + if err := c.Client.Get(context.TODO(), types.NamespacedName{ + Namespace: virtualCluster.Namespace, + Name: virtualCluster.Name, + }, current); err != nil { + klog.Errorf("get virtualcluster %s error. %v", virtualCluster.Name, err) + return err + } + updated := current.DeepCopy() - controllerutil.RemoveFinalizer(updated, VirtualClusterControllerFinalizer) - err := c.Client.Update(context.TODO(), updated) + controllerutil.RemoveFinalizer(updated, VirtualClusterControllerFinalizer) + return c.Client.Update(context.TODO(), updated) + }) if err != nil { klog.Errorf("Failed to remove finalizer to VirtualCluster %s/%s: %v", virtualCluster.Namespace, virtualCluster.Name, err) return reconcile.Result{Requeue: true}, err @@ -842,7 +919,6 @@ func createAPIAnpAgentSvc(name, namespace string, nameMap map[string]int) *corev } return apiAnpAgentSvc } - func (c *VirtualClusterInitController) GetNodePorts(client kubernetes.Interface, virtualCluster *v1alpha1.VirtualCluster) ([]int32, error) { ports := make([]int32, 5) ipFamilies := utils.IPFamilyGenerator(constants.APIServerServiceSubnet) diff --git a/pkg/kubenest/controlplane/service.go b/pkg/kubenest/controlplane/service.go index ce1efcccc..0fe8df7ee 100644 --- a/pkg/kubenest/controlplane/service.go +++ b/pkg/kubenest/controlplane/service.go @@ -15,7 +15,6 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/apiserver" - "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/coredns/host" "github.com/kosmos.io/kosmos/pkg/kubenest/manifest/controlplane/etcd" "github.com/kosmos.io/kosmos/pkg/kubenest/util" "github.com/kosmos.io/kosmos/pkg/utils" @@ -142,24 +141,24 @@ func createServerService(client clientset.Interface, name, namespace string, por return fmt.Errorf("err when creating etcd client service, err: %w", err) } - //core-dns service - coreDNSServiceBytes, err := util.ParseTemplate(host.CoreDNSService, struct { - Namespace string - }{ - Namespace: namespace, - }) - if err != nil { - return fmt.Errorf("error when parsing core-dns serive template: %w", err) - } - - coreDNSService := &corev1.Service{} - if err := yaml.Unmarshal([]byte(coreDNSServiceBytes), coreDNSService); err != nil { - return fmt.Errorf("err when decoding core-dns service: %w", err) - } - - if err := util.CreateOrUpdateService(client, coreDNSService); err != nil { - return fmt.Errorf("err when creating core-dns service, err: %w", err) - } + // //core-dns service + // coreDNSServiceBytes, err := util.ParseTemplate(host.CoreDNSService, struct { + // Namespace string + // }{ + // Namespace: namespace, + // }) + // if err != nil { + // return fmt.Errorf("error when parsing core-dns serive template: %w", err) + // } + + // coreDNSService := &corev1.Service{} + // if err := yaml.Unmarshal([]byte(coreDNSServiceBytes), coreDNSService); err != nil { + // return fmt.Errorf("err when decoding core-dns service: %w", err) + // } + + // if err := util.CreateOrUpdateService(client, coreDNSService); err != nil { + // return fmt.Errorf("err when creating core-dns service, err: %w", err) + // } return nil } diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index f086af28c..447fdcf15 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -84,11 +84,9 @@ func NewInitPhase(opts *InitOptions) *workflow.Phase { func NewUpdateCertPhase(opts *InitOptions) *workflow.Phase { initPhase := workflow.NewPhase() - initPhase.AppendTask(tasks.NewBackupCertTask()) initPhase.AppendTask(tasks.NewRenewCertTask()) initPhase.AppendTask(tasks.NewUploadCertsTask()) initPhase.AppendTask(tasks.NewUploadKubeconfigTask()) - initPhase.AppendTask(tasks.NewRenewCertsTask()) initPhase.SetDataInitializer(func() (workflow.RunData, error) { return newRunData(opts) diff --git a/pkg/kubenest/tasks/endpoint.go b/pkg/kubenest/tasks/endpoint.go index be78866a7..46730d390 100644 --- a/pkg/kubenest/tasks/endpoint.go +++ b/pkg/kubenest/tasks/endpoint.go @@ -21,6 +21,9 @@ func NewEndPointTask() workflow.Task { Name: "endpoint", Run: runEndpoint, RunSubTasks: true, + Skip: func(d workflow.RunData) (bool, error) { + return true, nil + }, Tasks: []workflow.Task{ { Name: "deploy-endpoint-in-virtual-cluster",