diff --git a/.github/workflows/chart.yml b/.github/workflows/chart.yml index a2d709555..f3022c580 100644 --- a/.github/workflows/chart.yml +++ b/.github/workflows/chart.yml @@ -18,7 +18,7 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6.0.1 + - uses: actions/checkout@v6.0.2 with: submodules: true fetch-depth: 0 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1587f61fe..64626ae62 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ on: paths-ignore: [docs/**, "**.md", "**.mdx", "**.png", "**.jpg"] env: - GO_VERSION: '1.24.9' + GO_VERSION: '1.24.12' CERT_MANAGER_VERSION: 'v1.16.2' jobs: @@ -41,7 +41,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 - name: Set up Ginkgo CLI run: | @@ -91,7 +91,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 - name: Move Docker data directory to /mnt # The default storage device on GitHub-hosted runners is running low during e2e tests. diff --git a/.github/workflows/code-lint.yml b/.github/workflows/code-lint.yml index ac3906219..65908017b 100644 --- a/.github/workflows/code-lint.yml +++ b/.github/workflows/code-lint.yml @@ -14,7 +14,7 @@ on: env: # Common versions - GO_VERSION: '1.24.9' + GO_VERSION: '1.24.12' jobs: @@ -43,7 +43,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Checkout - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 with: submodules: true @@ -64,7 +64,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 - name: golangci-lint run: make lint diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 91deab101..2f47f189e 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -38,7 +38,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL diff --git a/.github/workflows/codespell.yml b/.github/workflows/codespell.yml index b4aa7b8b0..aac7f666b 100644 --- a/.github/workflows/codespell.yml +++ b/.github/workflows/codespell.yml @@ -16,7 +16,7 @@ jobs: with: egress-policy: audit - - uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v4.1.7 + - uses: actions/checkout@0c366fd6a839edf440554fa01a7085ccba70ac98 # v4.1.7 - uses: codespell-project/actions-codespell@8f01853be192eb0f849a5c7d721450e7a467c579 # master with: check_filenames: true diff --git a/.github/workflows/markdown-lint.yml b/.github/workflows/markdown-lint.yml index bb2858063..d1ddd51d0 100644 --- a/.github/workflows/markdown-lint.yml +++ b/.github/workflows/markdown-lint.yml @@ -10,7 +10,7 @@ jobs: markdown-link-check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v6.0.1 + - uses: actions/checkout@v6.0.2 - uses: tcort/github-action-markdown-link-check@v1 with: # this will only show errors in the output diff --git a/.github/workflows/trivy.yml b/.github/workflows/trivy.yml index 995d8dd19..3cf096830 100644 --- a/.github/workflows/trivy.yml +++ b/.github/workflows/trivy.yml @@ -18,7 +18,7 @@ env: MEMBER_AGENT_IMAGE_NAME: member-agent REFRESH_TOKEN_IMAGE_NAME: refresh-token - GO_VERSION: '1.24.9' + GO_VERSION: '1.24.12' jobs: export-registry: @@ -44,7 +44,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Checkout code - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 - name: Login to ${{ env.REGISTRY }} uses: docker/login-action@5e57cd118135c172c3672efd75eb46360885c0ef diff --git a/.github/workflows/upgrade.yml b/.github/workflows/upgrade.yml index 1bfaff9df..5268175a3 100644 --- a/.github/workflows/upgrade.yml +++ b/.github/workflows/upgrade.yml @@ -17,7 +17,7 @@ on: paths-ignore: [docs/**, "**.md", "**.mdx", "**.png", "**.jpg"] env: - GO_VERSION: '1.24.9' + GO_VERSION: '1.24.12' jobs: detect-noop: @@ -44,7 +44,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 with: # Fetch the history of all branches and tags. # This is needed for the test suite to switch between releases. @@ -146,7 +146,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 with: # Fetch the history of all branches and tags. # This is needed for the test suite to switch between releases. @@ -248,7 +248,7 @@ jobs: go-version: ${{ env.GO_VERSION }} - name: Check out code into the Go module directory - uses: actions/checkout@v6.0.1 + uses: actions/checkout@v6.0.2 with: # Fetch the history of all branches and tags. # This is needed for the test suite to switch between releases. diff --git a/.golangci.yml b/.golangci.yml index a1b9bbc3a..d8c09541b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ run: timeout: 15m - go: '1.24.9' + go: '1.24.12' linters-settings: stylecheck: diff --git a/charts/member-agent/README.md b/charts/member-agent/README.md index 4c74d64fb..db1d894c5 100644 --- a/charts/member-agent/README.md +++ b/charts/member-agent/README.md @@ -42,8 +42,17 @@ helm upgrade member-agent member-agent/ --namespace fleet-system | logVerbosity | Log level. Uses V logs (klog) | `3` | | propertyProvider | The property provider to use with the member agent; if none is specified, the Fleet member agent will start with no property provider (i.e., the agent will expose no cluster properties, and collect only limited resource usage information) | `` | | region | The region where the member cluster resides | `` | +| workApplierRequeueRateLimiterAttemptsWithFixedDelay | This parameter is a set of values to control how frequent KubeFleet should reconcile (processed) manifests; it specifies then number of attempts to requeue with fixed delay before switching to exponential backoff | `1` | +| workApplierRequeueRateLimiterFixedDelaySeconds | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the fixed delay in seconds for initial requeue attempts | `5` | +| workApplierRequeueRateLimiterExponentialBaseForSlowBackoff | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the exponential base for the slow backoff stage | `1.2` | +| workApplierRequeueRateLimiterInitialSlowBackoffDelaySeconds | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the initial delay in seconds for the slow backoff stage | `2` | +| workApplierRequeueRateLimiterMaxSlowBackoffDelaySeconds | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the maximum delay in seconds for the slow backoff stage | `15` | +| workApplierRequeueRateLimiterExponentialBaseForFastBackoff | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the exponential base for the fast backoff stage | `1.5` | +| workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies the maximum delay in seconds for the fast backoff stage | `900` | +| workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs | This parameter is a set of values to control how frequent KubeFleet should reconcile (process) manifests; it specifies whether to skip the slow backoff stage and start fast backoff immediately for available or diff-reported work objects | `true` | | config.azureCloudConfig | The cloud provider configuration | **required if property provider is set to azure** | + ## Override Azure cloud config **If PropertyProvider feature is set to azure, then a cloud configuration is required.** diff --git a/charts/member-agent/templates/deployment.yaml b/charts/member-agent/templates/deployment.yaml index 065db7fa8..6b4257d56 100644 --- a/charts/member-agent/templates/deployment.yaml +++ b/charts/member-agent/templates/deployment.yaml @@ -45,6 +45,30 @@ spec: - --enable-pprof={{ .Values.enablePprof }} - --pprof-port={{ .Values.pprofPort }} - --hub-pprof-port={{ .Values.hubPprofPort }} + {{- if .Values.workApplierRequeueRateLimiterAttemptsWithFixedDelay }} + - --work-applier-requeue-rate-limiter-attempts-with-fixed-delay={{ .Values.workApplierRequeueRateLimiterAttemptsWithFixedDelay }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterFixedDelaySeconds }} + - --work-applier-requeue-rate-limiter-fixed-delay-seconds={{ .Values.workApplierRequeueRateLimiterFixedDelaySeconds }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterExponentialBaseForSlowBackoff }} + - --work-applier-requeue-rate-limiter-exponential-base-for-slow-backoff={{ .Values.workApplierRequeueRateLimiterExponentialBaseForSlowBackoff }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterInitialSlowBackoffDelaySeconds }} + - --work-applier-requeue-rate-limiter-initial-slow-backoff-delay-seconds={{ .Values.workApplierRequeueRateLimiterInitialSlowBackoffDelaySeconds }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterMaxSlowBackoffDelaySeconds }} + - --work-applier-requeue-rate-limiter-max-slow-backoff-delay-seconds={{ .Values.workApplierRequeueRateLimiterMaxSlowBackoffDelaySeconds }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterExponentialBaseForFastBackoff }} + - --work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff={{ .Values.workApplierRequeueRateLimiterExponentialBaseForFastBackoff }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds }} + - --work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds={{ .Values.workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds }} + {{- end }} + {{- if .Values.workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs }} + - --work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs={{ .Values.workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs }} + {{- end }} {{- if .Values.propertyProvider }} - --property-provider={{ .Values.propertyProvider }} {{- end }} diff --git a/cmd/hubagent/workload/setup.go b/cmd/hubagent/workload/setup.go index 3c9d48383..39203217e 100644 --- a/cmd/hubagent/workload/setup.go +++ b/cmd/hubagent/workload/setup.go @@ -161,18 +161,21 @@ func SetupControllers(ctx context.Context, wg *sync.WaitGroup, mgr ctrl.Manager, validator.RestMapper = mgr.GetRESTMapper() // webhook needs this to validate GVK of resource selector // Set up a custom controller to reconcile placement objects + resourceSelectorResolver := controller.ResourceSelectorResolver{ + RestMapper: mgr.GetRESTMapper(), + InformerManager: dynamicInformerManager, + ResourceConfig: resourceConfig, + SkippedNamespaces: skippedNamespaces, + EnableWorkload: opts.EnableWorkload, + } pc := &placement.Reconciler{ Client: mgr.GetClient(), Recorder: mgr.GetEventRecorderFor(placementControllerName), - RestMapper: mgr.GetRESTMapper(), - InformerManager: dynamicInformerManager, - ResourceConfig: resourceConfig, - SkippedNamespaces: skippedNamespaces, Scheme: mgr.GetScheme(), UncachedReader: mgr.GetAPIReader(), + ResourceSelectorResolver: resourceSelectorResolver, ResourceSnapshotCreationMinimumInterval: opts.ResourceSnapshotCreationMinimumInterval, ResourceChangesCollectionDuration: opts.ResourceChangesCollectionDuration, - EnableWorkload: opts.EnableWorkload, } rateLimiter := options.DefaultControllerRateLimiter(opts.RateLimiterOpts) diff --git a/docker/crd-installer.Dockerfile b/docker/crd-installer.Dockerfile index 3055a235f..07d8c6a6c 100644 --- a/docker/crd-installer.Dockerfile +++ b/docker/crd-installer.Dockerfile @@ -1,5 +1,5 @@ # Build the crdinstaller binary -FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.9 AS builder +FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.12 AS builder ARG GOOS=linux ARG GOARCH=amd64 diff --git a/docker/hub-agent.Dockerfile b/docker/hub-agent.Dockerfile index d03fcc093..c2c848114 100644 --- a/docker/hub-agent.Dockerfile +++ b/docker/hub-agent.Dockerfile @@ -1,5 +1,5 @@ # Build the hubagent binary -FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.9 AS builder +FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.12 AS builder ARG GOOS=linux ARG GOARCH=amd64 diff --git a/docker/member-agent.Dockerfile b/docker/member-agent.Dockerfile index b28b2cdff..34cd3ce16 100644 --- a/docker/member-agent.Dockerfile +++ b/docker/member-agent.Dockerfile @@ -1,5 +1,5 @@ # Build the memberagent binary -FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.9 AS builder +FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.12 AS builder ARG GOOS=linux ARG GOARCH=amd64 diff --git a/docker/refresh-token.Dockerfile b/docker/refresh-token.Dockerfile index 4d7aa7d83..05b88c09d 100644 --- a/docker/refresh-token.Dockerfile +++ b/docker/refresh-token.Dockerfile @@ -1,5 +1,5 @@ # Build the refreshtoken binary -FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.9 AS builder +FROM mcr.microsoft.com/oss/go/microsoft/golang:1.24.12 AS builder ARG GOOS="linux" ARG GOARCH="amd64" diff --git a/go.mod b/go.mod index c8524cc40..d25bc0a81 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module go.goms.io/fleet -go 1.24.9 +go 1.24.12 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.0 @@ -42,6 +42,7 @@ require ( sigs.k8s.io/cloud-provider-azure/pkg/azclient v0.5.20 sigs.k8s.io/cluster-inventory-api v0.0.0-20251028164203-2e3fabb46733 sigs.k8s.io/controller-runtime v0.22.4 + sigs.k8s.io/yaml v1.6.0 ) require ( @@ -134,7 +135,6 @@ require ( sigs.k8s.io/kustomize/kyaml v0.18.1 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect - sigs.k8s.io/yaml v1.6.0 // indirect ) replace ( diff --git a/pkg/controllers/clusterinventory/clusterprofile/controller.go b/pkg/controllers/clusterinventory/clusterprofile/controller.go index 56e2219ce..1e92e4858 100644 --- a/pkg/controllers/clusterinventory/clusterprofile/controller.go +++ b/pkg/controllers/clusterinventory/clusterprofile/controller.go @@ -39,6 +39,7 @@ import ( clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" ) @@ -110,6 +111,13 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{}, nil } + // Check if the MemberCluster has joined. + joinedCondition := meta.FindStatusCondition(mc.Status.Conditions, string(clusterv1beta1.ConditionTypeMemberClusterJoined)) + if !condition.IsConditionStatusTrue(joinedCondition, mc.Generation) { + klog.V(2).InfoS("Member cluster has not joined; skip cluster profile reconciliation", "memberCluster", mcRef) + return ctrl.Result{}, nil + } + // Check if the MemberCluster object has the cleanup finalizer; if not, add it. if !controllerutil.ContainsFinalizer(mc, clusterProfileCleanupFinalizer) { mc.Finalizers = append(mc.Finalizers, clusterProfileCleanupFinalizer) @@ -175,6 +183,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // fillInClusterStatus fills in the ClusterProfile status fields from the MemberCluster status. // Currently, it only fills in the Kubernetes version field. func (r *Reconciler) fillInClusterStatus(mc *clusterv1beta1.MemberCluster, cp *clusterinventory.ClusterProfile) { + clusterPropertyCondition := meta.FindStatusCondition(mc.Status.Conditions, string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded)) + if !condition.IsConditionStatusTrue(clusterPropertyCondition, mc.Generation) { + klog.V(3).InfoS("Cluster property collection has not succeeded; skip updating the cluster profile status", "memberCluster", klog.KObj(mc), "clusterProfile", klog.KObj(cp)) + return + } + k8sversion, exists := mc.Status.Properties[propertyprovider.K8sVersionProperty] if exists { klog.V(3).InfoS("Get Kubernetes version from member cluster status", "kubernetesVersion", k8sversion.Value, "clusterProfile", klog.KObj(cp)) diff --git a/pkg/controllers/clusterinventory/clusterprofile/controller_integration_test.go b/pkg/controllers/clusterinventory/clusterprofile/controller_integration_test.go index 532b20a12..e97e35646 100644 --- a/pkg/controllers/clusterinventory/clusterprofile/controller_integration_test.go +++ b/pkg/controllers/clusterinventory/clusterprofile/controller_integration_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/propertyprovider" "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" @@ -62,6 +63,22 @@ var _ = Describe("Test ClusterProfile Controller", func() { }) It("Should create a clusterProfile when a member cluster is created", func() { + By("Check the clusterProfile is not created") + Consistently(func() error { + return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) + }, consistentlyDuration, interval).ShouldNot(Succeed(), "clusterProfile is created before member cluster is marked as join") + By("Mark the member cluster as joined") + mc.Status.Conditions = []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined), + Status: metav1.ConditionTrue, + Reason: "Joined", + Message: "Member cluster has joined", + LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, + }, + } + Expect(k8sClient.Status().Update(ctx, mc)).Should(Succeed(), "failed to update member cluster status") By("Check the clusterProfile is created") Eventually(func() error { return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) @@ -79,6 +96,7 @@ var _ = Describe("Test ClusterProfile Controller", func() { Reason: "Healthy", Message: "Agent is healthy", LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, }, }, LastReceivedHeartbeat: metav1.Time{Time: time.Now()}, @@ -100,7 +118,19 @@ var _ = Describe("Test ClusterProfile Controller", func() { }, eventuallyTimeout, interval).Should(BeTrue(), "clusterProfile is not created") }) - It("Should recreate a clusterProfile when it is deleted by the user", func() { + It("Should recreate a clusterProfile when it is deleted by the user but properties should not show if MC property collection is not succeeded", func() { + By("Mark the member cluster as joined") + mc.Status.Conditions = []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined), + Status: metav1.ConditionTrue, + Reason: "Joined", + Message: "Member cluster has joined", + LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, + }, + } + Expect(k8sClient.Status().Update(ctx, mc)).Should(Succeed(), "failed to update member cluster status") By("Check the clusterProfile is created") Eventually(func() error { return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) @@ -111,13 +141,53 @@ var _ = Describe("Test ClusterProfile Controller", func() { Eventually(func() error { return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) }, eventuallyTimeout, interval).Should(Succeed(), "clusterProfile is not created") + By("Check the properties are not created") + Consistently(func() bool { + if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile); err != nil { + return false + } + return clusterProfile.Status.AccessProviders == nil || clusterProfile.Status.AccessProviders[0].Cluster.CertificateAuthorityData == nil + }, consistentlyDuration, interval).Should(BeTrue(), "ClusterCertificateAuthority property is created before member cluster is marked as collection succeeded") }) - It("Should update a clusterProfile when it is modified by the user", func() { + It("Should have property filled in clusterProfile created from MemberCluster and reconcile the clusterProfile if changed", func() { + By("Mark the member cluster as joined") + mc.Status.Conditions = []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionTrue, + Reason: "CollectionSucceeded", + Message: "Cluster property collection succeeded", + LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, + }, + { + Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined), + Status: metav1.ConditionTrue, + Reason: "Joined", + Message: "Member cluster has joined", + LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, + }, + } + mc.Status.Properties = map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + propertyprovider.ClusterCertificateAuthorityProperty: { + Value: "dummy-ca-data", + ObservationTime: metav1.Time{Time: time.Now()}, + }, + } + Expect(k8sClient.Status().Update(ctx, mc)).Should(Succeed(), "failed to update member cluster status") By("Check the clusterProfile is created") Eventually(func() error { return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) }, eventuallyTimeout, interval).Should(Succeed(), "clusterProfile is not created") + By("Check the properties in clusterProfile") + Eventually(func() bool { + if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile); err != nil { + return false + } + return string(clusterProfile.Status.AccessProviders[0].Cluster.CertificateAuthorityData) == "dummy-ca-data" + }, eventuallyTimeout, interval).Should(BeTrue(), "ClusterCertificateAuthority property is not created") By("Modifying the ClusterProfile") clusterProfile.Spec.DisplayName = "ModifiedMCName" Expect(k8sClient.Update(ctx, &clusterProfile)).Should(Succeed(), "failed to modify clusterProfile") @@ -131,6 +201,18 @@ var _ = Describe("Test ClusterProfile Controller", func() { }) It("Should delete the clusterProfile when the MemberCluster is deleted", func() { + By("Mark the member cluster as joined") + mc.Status.Conditions = []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeMemberClusterJoined), + Status: metav1.ConditionTrue, + Reason: "Joined", + Message: "Member cluster has joined", + LastTransitionTime: metav1.Time{Time: time.Now()}, + ObservedGeneration: mc.Generation, + }, + } + Expect(k8sClient.Status().Update(ctx, mc)).Should(Succeed(), "failed to update member cluster status") By("Check the clusterProfile is created") Eventually(func() error { return k8sClient.Get(ctx, types.NamespacedName{Namespace: clusterProfileNS, Name: testMCName}, &clusterProfile) diff --git a/pkg/controllers/clusterinventory/clusterprofile/controller_test.go b/pkg/controllers/clusterinventory/clusterprofile/controller_test.go index 7c53ddf72..3574f4607 100644 --- a/pkg/controllers/clusterinventory/clusterprofile/controller_test.go +++ b/pkg/controllers/clusterinventory/clusterprofile/controller_test.go @@ -20,13 +20,194 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clusterinventory "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1" clusterv1beta1 "go.goms.io/fleet/apis/cluster/v1beta1" + "go.goms.io/fleet/pkg/propertyprovider" + "go.goms.io/fleet/pkg/utils/controller" ) +func TestFillInClusterStatus(t *testing.T) { + reconciler := &Reconciler{} + + tests := []struct { + name string + memberCluster *clusterv1beta1.MemberCluster + clusterProfile *clusterinventory.ClusterProfile + expectVersion bool + expectedK8sVersion string + expectAccessProvider bool + expectedServer string + expectedCAData string + }{ + { + name: "Cluster property collection has not succeeded", + memberCluster: &clusterv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Generation: 1, + }, + Status: clusterv1beta1.MemberClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionFalse, + ObservedGeneration: 1, + }, + }, + }, + }, + clusterProfile: &clusterinventory.ClusterProfile{}, + expectVersion: false, + expectAccessProvider: false, + }, + { + name: "Cluster property collection succeeded but no properties", + memberCluster: &clusterv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Generation: 1, + }, + Status: clusterv1beta1.MemberClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{}, + }, + }, + clusterProfile: &clusterinventory.ClusterProfile{}, + expectVersion: false, + expectAccessProvider: true, + }, + { + name: "Cluster property collection succeeded with k8s version only", + memberCluster: &clusterv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Generation: 1, + }, + Status: clusterv1beta1.MemberClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + propertyprovider.K8sVersionProperty: { + Value: "v1.28.0", + }, + }, + }, + }, + clusterProfile: &clusterinventory.ClusterProfile{}, + expectVersion: true, + expectedK8sVersion: "v1.28.0", + expectAccessProvider: true, + }, + { + name: "Cluster property collection succeeded with all properties", + memberCluster: &clusterv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Generation: 1, + }, + Status: clusterv1beta1.MemberClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + propertyprovider.K8sVersionProperty: { + Value: "v1.29.1", + }, + propertyprovider.ClusterEntryPointProperty: { + Value: "https://api.test-cluster.example.com:6443", + }, + propertyprovider.ClusterCertificateAuthorityProperty: { + Value: "dGVzdC1jYS1kYXRh", + }, + }, + }, + }, + clusterProfile: &clusterinventory.ClusterProfile{}, + expectVersion: true, + expectedK8sVersion: "v1.29.1", + expectAccessProvider: true, + expectedServer: "https://api.test-cluster.example.com:6443", + expectedCAData: "dGVzdC1jYS1kYXRh", + }, + { + name: "Cluster property collection succeeded with partial properties", + memberCluster: &clusterv1beta1.MemberCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-cluster", + Generation: 1, + }, + Status: clusterv1beta1.MemberClusterStatus{ + Conditions: []metav1.Condition{ + { + Type: string(clusterv1beta1.ConditionTypeClusterPropertyCollectionSucceeded), + Status: metav1.ConditionTrue, + ObservedGeneration: 1, + }, + }, + Properties: map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{ + propertyprovider.K8sVersionProperty: { + Value: "v1.27.5", + }, + propertyprovider.ClusterEntryPointProperty: { + Value: "https://api.partial-cluster.example.com:6443", + }, + }, + }, + }, + clusterProfile: &clusterinventory.ClusterProfile{}, + expectVersion: true, + expectedK8sVersion: "v1.27.5", + expectAccessProvider: true, + expectedServer: "https://api.partial-cluster.example.com:6443", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reconciler.fillInClusterStatus(tt.memberCluster, tt.clusterProfile) + + expected := clusterinventory.ClusterProfileStatus{} + if tt.expectVersion { + expected.Version.Kubernetes = tt.expectedK8sVersion + } + if tt.expectAccessProvider { + expected.AccessProviders = []clusterinventory.AccessProvider{{ + Name: controller.ClusterManagerName, + }} + if tt.expectedServer != "" { + expected.AccessProviders[0].Cluster.Server = tt.expectedServer + } + if tt.expectedCAData != "" { + expected.AccessProviders[0].Cluster.CertificateAuthorityData = []byte(tt.expectedCAData) + } + } + + if diff := cmp.Diff(expected, tt.clusterProfile.Status); diff != "" { + t.Fatalf("test case `%s` failed diff (-want +got):\n%s", tt.name, diff) + } + }) + } +} + func TestSyncClusterProfileCondition(t *testing.T) { clusterUnhealthyThreshold := 5 * time.Minute reconciler := &Reconciler{ diff --git a/pkg/controllers/clusterinventory/clusterprofile/suite_test.go b/pkg/controllers/clusterinventory/clusterprofile/suite_test.go index c4da1db5f..f1745f443 100644 --- a/pkg/controllers/clusterinventory/clusterprofile/suite_test.go +++ b/pkg/controllers/clusterinventory/clusterprofile/suite_test.go @@ -25,14 +25,15 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "go.uber.org/zap/zapcore" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/klog/v2" - "k8s.io/klog/v2/textlogger" clusterinventory "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/server" @@ -62,6 +63,10 @@ var _ = BeforeSuite(func() { klog.InitFlags(fs) Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed()) + logger := zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true), zap.Level(zapcore.Level(-5))) + klog.SetLogger(logger) + ctrl.SetLogger(logger) + By("bootstrapping test environment") testEnv = &envtest.Environment{ CRDDirectoryPaths: []string{filepath.Join("../../../../", "config", "crd", "bases")}, @@ -92,7 +97,7 @@ var _ = BeforeSuite(func() { Metrics: server.Options{ BindAddress: "0", }, - Logger: textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))), + Logger: logger, }) Expect(err).Should(Succeed()) err = (&Reconciler{ diff --git a/pkg/controllers/placement/controller.go b/pkg/controllers/placement/controller.go index 5b9bb6930..189b75f23 100644 --- a/pkg/controllers/placement/controller.go +++ b/pkg/controllers/placement/controller.go @@ -42,12 +42,10 @@ import ( fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" hubmetrics "go.goms.io/fleet/pkg/metrics/hub" "go.goms.io/fleet/pkg/scheduler/queue" - "go.goms.io/fleet/pkg/utils" "go.goms.io/fleet/pkg/utils/annotations" "go.goms.io/fleet/pkg/utils/condition" "go.goms.io/fleet/pkg/utils/controller" "go.goms.io/fleet/pkg/utils/defaulter" - "go.goms.io/fleet/pkg/utils/informer" "go.goms.io/fleet/pkg/utils/labels" "go.goms.io/fleet/pkg/utils/resource" fleettime "go.goms.io/fleet/pkg/utils/time" @@ -64,12 +62,6 @@ const controllerResyncPeriod = 30 * time.Minute // Reconciler reconciles a cluster resource placement object type Reconciler struct { - // the informer contains the cache for all the resources we need. - InformerManager informer.Manager - - // RestMapper is used to convert between gvk and gvr on known resources. - RestMapper meta.RESTMapper - // Client is used to update objects which goes to the api server directly. Client client.Client @@ -78,25 +70,19 @@ type Reconciler struct { // It's only needed by v1beta1 APIs. UncachedReader client.Reader - // ResourceConfig contains all the API resources that we won't select based on allowed or skipped propagating APIs option. - ResourceConfig *utils.ResourceConfig - - // SkippedNamespaces contains the namespaces that we should not propagate. - SkippedNamespaces map[string]bool - Recorder record.EventRecorder Scheme *runtime.Scheme + // ResourceSelectorResolver + ResourceSelectorResolver controller.ResourceSelectorResolver + // ResourceSnapshotCreationMinimumInterval is the minimum interval to create a new resourcesnapshot // to avoid too frequent updates. ResourceSnapshotCreationMinimumInterval time.Duration // ResourceChangesCollectionDuration is the duration for collecting resource changes into one snapshot. ResourceChangesCollectionDuration time.Duration - - // EnableWorkload indicates whether workloads are allowed to run on the hub cluster. - EnableWorkload bool } func (r *Reconciler) Reconcile(ctx context.Context, key controller.QueueKey) (ctrl.Result, error) { @@ -198,7 +184,7 @@ func (r *Reconciler) handleUpdate(ctx context.Context, placementObj fleetv1beta1 } // validate the resource selectors first before creating any snapshot - envelopeObjCount, selectedResources, selectedResourceIDs, err := r.selectResourcesForPlacement(placementObj) + envelopeObjCount, selectedResources, selectedResourceIDs, err := r.ResourceSelectorResolver.SelectResourcesForPlacement(placementObj) if err != nil { klog.ErrorS(err, "Failed to select the resources", "placement", placementKObj) if !errors.Is(err, controller.ErrUserError) { diff --git a/pkg/controllers/placement/suite_test.go b/pkg/controllers/placement/suite_test.go index c8381a4f4..c478f2e7e 100644 --- a/pkg/controllers/placement/suite_test.go +++ b/pkg/controllers/placement/suite_test.go @@ -111,11 +111,7 @@ var _ = BeforeSuite(func() { }) Expect(err).Should(Succeed(), "failed to create manager") - reconciler := &Reconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - UncachedReader: mgr.GetAPIReader(), - Recorder: mgr.GetEventRecorderFor(controllerName), + resourceSelectorResolver := controller.ResourceSelectorResolver{ RestMapper: mgr.GetRESTMapper(), InformerManager: informer.NewInformerManager(dynamicClient, 5*time.Minute, ctx.Done()), ResourceConfig: utils.NewResourceConfig(false), @@ -123,6 +119,13 @@ var _ = BeforeSuite(func() { "default": true, }, } + reconciler := &Reconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + UncachedReader: mgr.GetAPIReader(), + Recorder: mgr.GetEventRecorderFor(controllerName), + ResourceSelectorResolver: resourceSelectorResolver, + } opts := options.RateLimitOptions{ RateLimiterBaseDelay: 5 * time.Millisecond, RateLimiterMaxDelay: 60 * time.Second, diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index fd9837c78..7e91365d1 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -551,23 +551,40 @@ func (r *Reconciler) forgetWorkAndRemoveFinalizer(ctx context.Context, work *fle // ensureAppliedWork makes sure that an associated appliedWork and a finalizer on the work resource exists on the cluster. func (r *Reconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.Work) (*fleetv1beta1.AppliedWork, error) { workRef := klog.KObj(work) - appliedWork := &fleetv1beta1.AppliedWork{} - hasFinalizer := false - if controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { - hasFinalizer = true - err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork) - switch { - case apierrors.IsNotFound(err): - klog.ErrorS(err, "AppliedWork finalizer resource does not exist even with the finalizer, it will be recreated", "appliedWork", workRef.Name) - case err != nil: - klog.ErrorS(err, "Failed to retrieve the appliedWork ", "appliedWork", workRef.Name) - return nil, controller.NewAPIServerError(true, err) - default: - return appliedWork, nil + + // Add a finalizer to the Work object. + if !controllerutil.ContainsFinalizer(work, fleetv1beta1.WorkFinalizer) { + work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer) + + if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil { + klog.ErrorS(err, "Failed to add the cleanup finalizer to the work", "work", workRef) + return nil, controller.NewAPIServerError(false, err) } + klog.V(2).InfoS("Added the cleanup finalizer to the Work object", "work", workRef) + } + + // Check if an AppliedWork object already exists for the Work object. + // + // Since we only create an AppliedWork object after adding the finalizer to the Work object, + // usually it is safe for us to assume that if the finalizer is absent, the AppliedWork object should + // not exist. This is not the case with the work applier though, as the controller features a + // Leave method that will strip all Work objects off their finalizers, which is called when the + // member cluster leaves the fleet. If the member cluster chooses to re-join the fleet, the controller + // will see a Work object with no finalizer but with an AppliedWork object. Because of this, here we always + // check for the existence of the AppliedWork object, with or without the finalizer. + appliedWork := &fleetv1beta1.AppliedWork{} + err := r.spokeClient.Get(ctx, types.NamespacedName{Name: work.Name}, appliedWork) + switch { + case err == nil: + // The AppliedWork already exists; no further action is needed. + klog.V(2).InfoS("Found an AppliedWork for the Work object", "work", workRef, "appliedWork", klog.KObj(appliedWork)) + return appliedWork, nil + case !apierrors.IsNotFound(err): + klog.ErrorS(err, "Failed to retrieve the appliedWork object", "appliedWork", workRef.Name) + return nil, controller.NewAPIServerError(true, err) } - // we create the appliedWork before setting the finalizer, so it should always exist unless it's deleted behind our back + // The AppliedWork object does not exist; create one. appliedWork = &fleetv1beta1.AppliedWork{ ObjectMeta: metav1.ObjectMeta{ Name: work.Name, @@ -577,20 +594,14 @@ func (r *Reconciler) ensureAppliedWork(ctx context.Context, work *fleetv1beta1.W WorkNamespace: work.Namespace, }, } - if err := r.spokeClient.Create(ctx, appliedWork); err != nil && !apierrors.IsAlreadyExists(err) { - klog.ErrorS(err, "AppliedWork create failed", "appliedWork", workRef.Name) + if err := r.spokeClient.Create(ctx, appliedWork); err != nil { + // Note: the controller must retry on AppliedWork AlreadyExists errors; otherwise the + // controller will run the reconciliation loop with an AppliedWork that has no UID, + // which might lead to takeover failures in later steps. + klog.ErrorS(err, "Failed to create an AppliedWork object for the Work object", "appliedWork", klog.KObj(appliedWork), "work", workRef) return nil, controller.NewAPIServerError(false, err) } - if !hasFinalizer { - klog.InfoS("Add the finalizer to the work", "work", workRef) - work.Finalizers = append(work.Finalizers, fleetv1beta1.WorkFinalizer) - - if err := r.hubClient.Update(ctx, work, &client.UpdateOptions{}); err != nil { - klog.ErrorS(err, "Failed to add the finalizer to the work", "work", workRef) - return nil, controller.NewAPIServerError(false, err) - } - } - klog.InfoS("Recreated the appliedWork resource", "appliedWork", workRef.Name) + klog.V(2).InfoS("Created an AppliedWork for the Work object", "work", workRef, "appliedWork", klog.KObj(appliedWork)) return appliedWork, nil } diff --git a/pkg/controllers/workapplier/controller_test.go b/pkg/controllers/workapplier/controller_test.go index 2dadd9722..1a266bc5f 100644 --- a/pkg/controllers/workapplier/controller_test.go +++ b/pkg/controllers/workapplier/controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package workapplier import ( + "context" "fmt" "log" "os" @@ -32,8 +33,10 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client/fake" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" ) @@ -208,7 +211,8 @@ var ( ) var ( - ignoreFieldTypeMetaInNamespace = cmpopts.IgnoreFields(corev1.Namespace{}, "TypeMeta") + ignoreFieldTypeMetaInNamespace = cmpopts.IgnoreFields(corev1.Namespace{}, "TypeMeta") + ignoreFieldObjectMetaresourceVersion = cmpopts.IgnoreFields(metav1.ObjectMeta{}, "ResourceVersion") lessFuncAppliedResourceMeta = func(i, j fleetv1beta1.AppliedResourceMeta) bool { iStr := fmt.Sprintf("%s/%s/%s/%s/%s", i.Group, i.Version, i.Kind, i.Namespace, i.Name) @@ -262,6 +266,14 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } +func fakeClientScheme(t *testing.T) *runtime.Scheme { + scheme := runtime.NewScheme() + if err := fleetv1beta1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add placement v1beta1 scheme: %v", err) + } + return scheme +} + func initializeVariables() { var err error @@ -332,3 +344,197 @@ func TestPrepareManifestProcessingBundles(t *testing.T) { t.Errorf("prepareManifestProcessingBundles() mismatches (-got +want):\n%s", diff) } } + +// TestEnsureAppliedWork tests the ensureAppliedWork method. +func TestEnsureAppliedWork(t *testing.T) { + ctx := context.Background() + + fakeUID := types.UID("foo") + testCases := []struct { + name string + work *fleetv1beta1.Work + appliedWork *fleetv1beta1.AppliedWork + wantWork *fleetv1beta1.Work + wantAppliedWork *fleetv1beta1.AppliedWork + }{ + { + name: "with work cleanup finalizer present, but no corresponding AppliedWork exists", + work: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + wantWork: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + wantAppliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + }, + { + name: "with work cleanup finalizer present, and corresponding AppliedWork exists", + work: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + appliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + // Add the UID field to track if the method returns the existing object. + UID: fakeUID, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + wantWork: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + wantAppliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + UID: fakeUID, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + }, + { + name: "without work cleanup finalizer, but corresponding AppliedWork exists", + work: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + }, + }, + appliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + // Add the UID field to track if the method returns the existing object. + UID: fakeUID, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + wantWork: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + wantAppliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + UID: fakeUID, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + }, + { + name: "without work cleanup finalizer, and no corresponding AppliedWork exists", + work: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + }, + }, + wantWork: &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + Namespace: memberReservedNSName1, + Finalizers: []string{ + fleetv1beta1.WorkFinalizer, + }, + }, + }, + wantAppliedWork: &fleetv1beta1.AppliedWork{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + }, + Spec: fleetv1beta1.AppliedWorkSpec{ + WorkName: workName, + WorkNamespace: memberReservedNSName1, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + hubClientScheme := fakeClientScheme(t) + fakeHubClient := fake.NewClientBuilder(). + WithScheme(hubClientScheme). + WithObjects(tc.work). + Build() + + memberClientScheme := fakeClientScheme(t) + fakeMemberClientBuilder := fake.NewClientBuilder().WithScheme(memberClientScheme) + if tc.appliedWork != nil { + fakeMemberClientBuilder = fakeMemberClientBuilder.WithObjects(tc.appliedWork) + } + fakeMemberClient := fakeMemberClientBuilder.Build() + + r := &Reconciler{ + hubClient: fakeHubClient, + spokeClient: fakeMemberClient, + } + + gotAppliedWork, err := r.ensureAppliedWork(ctx, tc.work) + if err != nil { + t.Fatalf("ensureAppliedWork() = %v, want no error", err) + } + + // Verify the Work object. + gotWork := &fleetv1beta1.Work{} + if err := fakeHubClient.Get(ctx, types.NamespacedName{Name: tc.work.Name, Namespace: tc.work.Namespace}, gotWork); err != nil { + t.Fatalf("failed to get Work object from fake hub client: %v", err) + } + if diff := cmp.Diff(gotWork, tc.wantWork, ignoreFieldObjectMetaresourceVersion); diff != "" { + t.Errorf("Work objects diff (-got +want):\n%s", diff) + } + + // Verify the AppliedWork object. + if diff := cmp.Diff(gotAppliedWork, tc.wantAppliedWork, ignoreFieldObjectMetaresourceVersion); diff != "" { + t.Errorf("AppliedWork objects diff (-got +want):\n%s", diff) + } + }) + } +} diff --git a/pkg/resourcewatcher/change_dector.go b/pkg/resourcewatcher/change_dector.go index 01f314944..0f445878a 100644 --- a/pkg/resourcewatcher/change_dector.go +++ b/pkg/resourcewatcher/change_dector.go @@ -168,7 +168,7 @@ func (d *ChangeDetector) dynamicResourceFilter(obj interface{}) bool { } if unstructuredObj, ok := obj.(*unstructured.Unstructured); ok { - shouldPropagate, err := utils.ShouldPropagateObj(d.InformerManager, unstructuredObj.DeepCopy(), d.EnableWorkload) + shouldPropagate, err := controller.ShouldPropagateObj(d.InformerManager, unstructuredObj.DeepCopy(), d.EnableWorkload) if err != nil || !shouldPropagate { klog.V(5).InfoS("Skip watching resource in namespace", "namespace", cwKey.Namespace, "group", cwKey.Group, "version", cwKey.Version, "kind", cwKey.Kind, "object", cwKey.Name) diff --git a/pkg/utils/common.go b/pkg/utils/common.go index dfb367d01..8a3764e24 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -27,7 +27,6 @@ import ( appv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - discoveryv1 "k8s.io/api/discovery/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -35,10 +34,7 @@ import ( storagev1 "k8s.io/api/storage/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/util/retry" @@ -50,8 +46,6 @@ import ( placementv1 "go.goms.io/fleet/apis/placement/v1" placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils/condition" - "go.goms.io/fleet/pkg/utils/controller" - "go.goms.io/fleet/pkg/utils/informer" ) const ( @@ -505,71 +499,6 @@ func CheckCRDInstalled(discoveryClient discovery.DiscoveryInterface, gvk schema. return err } -// ShouldPropagateObj decides if one should propagate the object. -// PVCs are only propagated when enableWorkload is false (workloads not allowed on hub). -func ShouldPropagateObj(informerManager informer.Manager, uObj *unstructured.Unstructured, enableWorkload bool) (bool, error) { - // TODO: add more special handling for different resource kind - switch uObj.GroupVersionKind() { - case appv1.SchemeGroupVersion.WithKind(ReplicaSetKind): - // Skip ReplicaSets if they are managed by Deployments (have owner references). - // Standalone ReplicaSets (without owners) can be propagated. - if len(uObj.GetOwnerReferences()) > 0 { - return false, nil - } - case appv1.SchemeGroupVersion.WithKind("ControllerRevision"): - // Skip ControllerRevisions if they are managed by DaemonSets/StatefulSets (have owner references). - // Standalone ControllerRevisions (without owners) can be propagated. - if len(uObj.GetOwnerReferences()) > 0 { - return false, nil - } - case corev1.SchemeGroupVersion.WithKind(ConfigMapKind): - // Skip the built-in custom CA certificate created in the namespace. - if uObj.GetName() == "kube-root-ca.crt" { - return false, nil - } - case corev1.SchemeGroupVersion.WithKind("ServiceAccount"): - // Skip the default service account created in the namespace. - if uObj.GetName() == "default" { - return false, nil - } - case corev1.SchemeGroupVersion.WithKind("Secret"): - // The secret, with type 'kubernetes.io/service-account-token', is created along with `ServiceAccount` should be - // prevented from propagating. - var secret corev1.Secret - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &secret); err != nil { - return false, controller.NewUnexpectedBehaviorError(fmt.Errorf("failed to convert a secret object %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) - } - if secret.Type == corev1.SecretTypeServiceAccountToken { - return false, nil - } - case corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim"): - // Skip PersistentVolumeClaims by default to avoid conflicts with the PVCs created by statefulset controller. - // This only happens if the workloads are allowed to run on the hub cluster. - if enableWorkload { - return false, nil - } - case corev1.SchemeGroupVersion.WithKind("Endpoints"): - // we assume that all endpoints with the same name of a service is created by the service controller - if _, err := informerManager.Lister(ServiceGVR).ByNamespace(uObj.GetNamespace()).Get(uObj.GetName()); err != nil { - if apierrors.IsNotFound(err) { - // there is no service of the same name as the end point, - // we assume that this endpoint is created by the user - return true, nil - } - return false, controller.NewAPIServerError(true, fmt.Errorf("failed to get the service %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) - } - // we find a service of the same name as the endpoint, we assume it's created by the service - return false, nil - case discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice"): - // all EndpointSlice created by the EndpointSlice controller has a managed by label - if _, exist := uObj.GetLabels()[discoveryv1.LabelManagedBy]; exist { - // do not propagate hub cluster generated endpoint slice - return false, nil - } - } - return true, nil -} - // IsReservedNamespace indicates if an argued namespace is reserved. func IsReservedNamespace(namespace string) bool { return strings.HasPrefix(namespace, fleetPrefix) || strings.HasPrefix(namespace, kubePrefix) @@ -774,7 +703,7 @@ var LessFuncDiffedResourcePlacementsV1 = func(a, b placementv1.DiffedResourcePla return aStr < bStr } -// LessFuncCondition is a less function for sorting conditions based on its types. +// LessFuncConditionByType is a less function for sorting conditions based on its types. var LessFuncConditionByType = func(a, b metav1.Condition) bool { return a.Type < b.Type } diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index f41b900cc..9c8900662 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -6,7 +6,6 @@ import ( "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/utils/ptr" fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" @@ -1190,221 +1189,3 @@ func TestIsDiffedResourcePlacementEqual(t *testing.T) { }) } } - -func TestShouldPropagateObj(t *testing.T) { - tests := []struct { - name string - obj map[string]interface{} - ownerReferences []metav1.OwnerReference - enableWorkload bool - want bool - }{ - { - name: "standalone replicaset without ownerReferences should propagate", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ReplicaSet", - "metadata": map[string]interface{}{ - "name": "standalone-rs", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: true, - want: true, - }, - { - name: "standalone replicaset without ownerReferences should propagate if workload is disabled", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ReplicaSet", - "metadata": map[string]interface{}{ - "name": "standalone-rs", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: false, - want: true, - }, - { - name: "standalone pod without ownerReferences should propagate", - obj: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]interface{}{ - "name": "standalone-pod", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: true, - want: true, - }, - { - name: "replicaset with deployment owner should NOT propagate", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ReplicaSet", - "metadata": map[string]interface{}{ - "name": "test-deploy-abc123", - "namespace": "default", - }, - }, - ownerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "Deployment", - Name: "test-deploy", - UID: "12345", - }, - }, - enableWorkload: true, - want: false, - }, - { - name: "pod owned by replicaset - passes ShouldPropagateObj but filtered by resource config", - obj: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]interface{}{ - "name": "test-deploy-abc123-xyz", - "namespace": "default", - }, - }, - ownerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "ReplicaSet", - Name: "test-deploy-abc123", - UID: "67890", - }, - }, - enableWorkload: false, - want: true, // ShouldPropagateObj doesn't filter Pods - they're filtered by NewResourceConfig - }, - { - name: "controllerrevision owned by daemonset should NOT propagate", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ControllerRevision", - "metadata": map[string]interface{}{ - "name": "test-ds-7b9848797f", - "namespace": "default", - }, - }, - ownerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "DaemonSet", - Name: "test-ds", - UID: "abcdef", - }, - }, - enableWorkload: false, - want: false, - }, - { - name: "controllerrevision owned by statefulset should NOT propagate", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ControllerRevision", - "metadata": map[string]interface{}{ - "name": "test-ss-7878b4b446", - "namespace": "default", - }, - }, - ownerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "test-ss", - UID: "fedcba", - }, - }, - enableWorkload: false, - want: false, - }, - { - name: "standalone controllerrevision without owner should propagate", - obj: map[string]interface{}{ - "apiVersion": "apps/v1", - "kind": "ControllerRevision", - "metadata": map[string]interface{}{ - "name": "custom-revision", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: false, - want: true, - }, - { - name: "PVC should propagate when workload is disabled", - obj: map[string]interface{}{ - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": map[string]interface{}{ - "name": "test-pvc", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: false, - want: true, - }, - { - name: "PVC should NOT propagate when workload is enabled", - obj: map[string]interface{}{ - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": map[string]interface{}{ - "name": "test-pvc", - "namespace": "default", - }, - }, - ownerReferences: nil, - enableWorkload: true, - want: false, - }, - { - name: "PVC with ownerReferences should NOT propagate when workload is enabled", - obj: map[string]interface{}{ - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": map[string]interface{}{ - "name": "data-statefulset-0", - "namespace": "default", - }, - }, - ownerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Kind: "StatefulSet", - Name: "statefulset", - UID: "sts-uid", - }, - }, - enableWorkload: true, - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - uObj := &unstructured.Unstructured{Object: tt.obj} - if tt.ownerReferences != nil { - uObj.SetOwnerReferences(tt.ownerReferences) - } - - got, err := ShouldPropagateObj(nil, uObj, tt.enableWorkload) - if err != nil { - t.Errorf("ShouldPropagateObj() error = %v", err) - return - } - if got != tt.want { - t.Errorf("ShouldPropagateObj() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/controllers/placement/resource_selector.go b/pkg/utils/controller/resource_selector_resolver.go similarity index 67% rename from pkg/controllers/placement/resource_selector.go rename to pkg/utils/controller/resource_selector_resolver.go index d849d3c92..ca6065081 100644 --- a/pkg/controllers/placement/resource_selector.go +++ b/pkg/utils/controller/resource_selector_resolver.go @@ -14,15 +14,18 @@ See the License for the specific language governing permissions and limitations under the License. */ -package placement +package controller import ( "fmt" "sort" "strings" + appv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -34,9 +37,9 @@ import ( "k8s.io/kubectl/pkg/util/deployment" "sigs.k8s.io/controller-runtime/pkg/client" - fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" + placementv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils" - "go.goms.io/fleet/pkg/utils/controller" + "go.goms.io/fleet/pkg/utils/informer" ) var ( @@ -88,10 +91,152 @@ var ( } ) +type ResourceSelectorResolver struct { + // SkippedNamespaces contains the namespaces that should be skipped when selecting resources. + SkippedNamespaces map[string]bool + + // ResourceConfig contains the resource configuration. + ResourceConfig *utils.ResourceConfig + + // InformerManager is the informer manager. + InformerManager informer.Manager + + // RestMapper is the rest mapper used to convert between gvk and gvr on known resources. + RestMapper meta.RESTMapper + + // EnableWorkload indicates whether workload resources are allowed to be selected. + EnableWorkload bool +} + +// SelectResourcesForPlacement selects the resources according to the placement resourceSelectors. +// It also generates an array of resource content and resource identifier based on the selected resources. +// It also returns the number of envelope configmaps so the CRP controller can have the right expectation of the number of work objects. +func (rs *ResourceSelectorResolver) SelectResourcesForPlacement(placementObj placementv1beta1.PlacementObj) (int, []placementv1beta1.ResourceContent, []placementv1beta1.ResourceIdentifier, error) { + envelopeObjCount := 0 + selectedObjects, err := rs.gatherSelectedResource(types.NamespacedName{ + Name: placementObj.GetName(), + Namespace: placementObj.GetNamespace(), + }, placementObj.GetPlacementSpec().ResourceSelectors) + if err != nil { + return 0, nil, nil, err + } + + resources := make([]placementv1beta1.ResourceContent, len(selectedObjects)) + resourcesIDs := make([]placementv1beta1.ResourceIdentifier, len(selectedObjects)) + for i, unstructuredObj := range selectedObjects { + rc, err := generateResourceContent(unstructuredObj) + if err != nil { + return 0, nil, nil, err + } + uGVK := unstructuredObj.GetObjectKind().GroupVersionKind().GroupKind() + switch uGVK { + case utils.ClusterResourceEnvelopeGK: + envelopeObjCount++ + case utils.ResourceEnvelopeGK: + envelopeObjCount++ + } + resources[i] = *rc + ri := placementv1beta1.ResourceIdentifier{ + Group: unstructuredObj.GroupVersionKind().Group, + Version: unstructuredObj.GroupVersionKind().Version, + Kind: unstructuredObj.GroupVersionKind().Kind, + Name: unstructuredObj.GetName(), + Namespace: unstructuredObj.GetNamespace(), + } + resourcesIDs[i] = ri + } + return envelopeObjCount, resources, resourcesIDs, nil +} + +// generateResourceContent creates a resource content from the unstructured obj. +func generateResourceContent(object *unstructured.Unstructured) (*placementv1beta1.ResourceContent, error) { + rawContent, err := generateRawContent(object) + if err != nil { + return nil, NewUnexpectedBehaviorError(err) + } + return &placementv1beta1.ResourceContent{ + RawExtension: runtime.RawExtension{Raw: rawContent}, + }, nil +} + +// generateRawContent strips all the unnecessary fields to prepare the objects for dispatch. +func generateRawContent(object *unstructured.Unstructured) ([]byte, error) { + // Make a deep copy of the object as we are modifying it. + object = object.DeepCopy() + // we keep the annotation/label/finalizer/owner references/delete grace period + object.SetResourceVersion("") + object.SetGeneration(0) + object.SetUID("") + object.SetSelfLink("") + object.SetDeletionTimestamp(nil) + object.SetManagedFields(nil) + + annots := object.GetAnnotations() + if annots != nil { + // Remove kubectl last applied annotation if exist + delete(annots, corev1.LastAppliedConfigAnnotation) + // Remove the revision annotation set by deployment + delete(annots, deployment.RevisionAnnotation) + if len(annots) == 0 { + object.SetAnnotations(nil) + } else { + object.SetAnnotations(annots) + } + } + // Remove all the owner references as the UID in the owner reference can't be transferred to + // the member clusters + // TODO: Establish a way to keep the ownership relation through work-api + object.SetOwnerReferences(nil) + unstructured.RemoveNestedField(object.Object, "metadata", "creationTimestamp") + unstructured.RemoveNestedField(object.Object, "status") + + // TODO: see if there are other cases that we may have some extra fields + if object.GetKind() == "Service" && object.GetAPIVersion() == "v1" { + if clusterIP, exist, _ := unstructured.NestedString(object.Object, "spec", "clusterIP"); exist && clusterIP != corev1.ClusterIPNone { + unstructured.RemoveNestedField(object.Object, "spec", "clusterIP") + unstructured.RemoveNestedField(object.Object, "spec", "clusterIPs") + } + // We should remove all node ports that are assigned by hubcluster if any. + unstructured.RemoveNestedField(object.Object, "spec", "healthCheckNodePort") + + vals, found, err := unstructured.NestedFieldNoCopy(object.Object, "spec", "ports") + if found && err == nil { + if ports, ok := vals.([]interface{}); ok { + for i := range ports { + if each, ok := ports[i].(map[string]interface{}); ok { + delete(each, "nodePort") + } + } + } + } + if err != nil { + return nil, fmt.Errorf("failed to get the ports field in Service object, name =%s: %w", object.GetName(), err) + } + } else if object.GetKind() == "Job" && object.GetAPIVersion() == batchv1.SchemeGroupVersion.String() { + if manualSelector, exist, _ := unstructured.NestedBool(object.Object, "spec", "manualSelector"); !exist || !manualSelector { + // remove the selector field and labels added by the api-server if the job is not created with manual selector + // whose value conflict with the ones created by the member cluster api server + // https://github.com/kubernetes/kubernetes/blob/d4fde1e92a83cb533ae63b3abe9d49f08efb7a2f/pkg/registry/batch/job/strategy.go#L219 + // k8s used to add an old label called "controller-uid" but use a new label called "batch.kubernetes.io/controller-uid" after 1.26 + unstructured.RemoveNestedField(object.Object, "spec", "selector", "matchLabels", "controller-uid") + unstructured.RemoveNestedField(object.Object, "spec", "selector", "matchLabels", "batch.kubernetes.io/controller-uid") + unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "creationTimestamp") + unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "controller-uid") + unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "batch.kubernetes.io/controller-uid") + } + } + + rawContent, err := object.MarshalJSON() + if err != nil { + return nil, fmt.Errorf("failed to marshal the unstructured object gvk = %s, name =%s: %w", object.GroupVersionKind(), object.GetName(), err) + } + return rawContent, nil +} + // gatherSelectedResource gets all the resources according to the resource selector. -func (r *Reconciler) gatherSelectedResource(placementKey types.NamespacedName, selectors []fleetv1beta1.ResourceSelectorTerm) ([]*unstructured.Unstructured, error) { +func (rs *ResourceSelectorResolver) gatherSelectedResource(placementKey types.NamespacedName, selectors []placementv1beta1.ResourceSelectorTerm) ([]*unstructured.Unstructured, error) { var resources []*unstructured.Unstructured - var resourceMap = make(map[fleetv1beta1.ResourceIdentifier]bool) + var resourceMap = make(map[placementv1beta1.ResourceIdentifier]bool) for _, selector := range selectors { gvk := schema.GroupVersionKind{ Group: selector.Group, @@ -99,23 +244,23 @@ func (r *Reconciler) gatherSelectedResource(placementKey types.NamespacedName, s Kind: selector.Kind, } - if r.ResourceConfig.IsResourceDisabled(gvk) { + if rs.ResourceConfig.IsResourceDisabled(gvk) { klog.V(2).InfoS("Skip select resource", "group version kind", gvk.String()) continue } var objs []runtime.Object var err error - if gvk == utils.NamespaceGVK && placementKey.Namespace == "" && selector.SelectionScope != fleetv1beta1.NamespaceOnly { - objs, err = r.fetchNamespaceResources(selector, placementKey.Name) + if gvk == utils.NamespaceGVK && placementKey.Namespace == "" && selector.SelectionScope != placementv1beta1.NamespaceOnly { + objs, err = rs.fetchNamespaceResources(selector, placementKey.Name) } else { - objs, err = r.fetchResources(selector, placementKey) + objs, err = rs.fetchResources(selector, placementKey) } if err != nil { return nil, err } for _, obj := range objs { uObj := obj.(*unstructured.Unstructured) - ri := fleetv1beta1.ResourceIdentifier{ + ri := placementv1beta1.ResourceIdentifier{ Group: obj.GetObjectKind().GroupVersionKind().Group, Version: obj.GetObjectKind().GroupVersionKind().Version, Kind: obj.GetObjectKind().GroupVersionKind().Kind, @@ -125,7 +270,7 @@ func (r *Reconciler) gatherSelectedResource(placementKey types.NamespacedName, s if _, exist := resourceMap[ri]; exist { err = fmt.Errorf("found duplicate resource %+v", ri) klog.ErrorS(err, "User selected one resource more than once", "resource", ri, "placement", placementKey) - return nil, controller.NewUserError(err) + return nil, NewUserError(err) } resourceMap[ri] = true resources = append(resources, uObj) @@ -182,16 +327,116 @@ func lessByGVK(obj1, obj2 *unstructured.Unstructured, ignoreKind bool) bool { return comp < 0 } +// fetchNamespaceResources retrieves all the objects for a ResourceSelectorTerm that is for namespace. +func (rs *ResourceSelectorResolver) fetchNamespaceResources(selector placementv1beta1.ResourceSelectorTerm, placementName string) ([]runtime.Object, error) { + klog.V(2).InfoS("start to fetch the namespace resources by the selector", "selector", selector) + var resources []runtime.Object + + if len(selector.Name) != 0 { + // just a single namespace + objs, err := rs.fetchAllResourcesInOneNamespace(selector.Name, placementName) + if err != nil { + klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name) + return nil, err + } + return objs, err + } + + // go through each namespace + var labelSelector labels.Selector + var err error + if selector.LabelSelector == nil { + labelSelector = labels.Everything() + } else { + labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector) + if err != nil { + return nil, NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) + } + } + namespaces, err := rs.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector) + if err != nil { + klog.ErrorS(err, "Cannot list all the namespaces by the label selector", "labelSelector", labelSelector, "placement", placementName) + return nil, NewAPIServerError(true, err) + } + + for _, namespace := range namespaces { + ns, err := meta.Accessor(namespace) + if err != nil { + return nil, NewUnexpectedBehaviorError(fmt.Errorf("cannot get the name of a namespace object: %w", err)) + } + objs, err := rs.fetchAllResourcesInOneNamespace(ns.GetName(), placementName) + if err != nil { + klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName()) + return nil, err + } + resources = append(resources, objs...) + } + return resources, nil +} + +// fetchAllResourcesInOneNamespace retrieves all the objects inside a single namespace which includes the namespace itself. +func (rs *ResourceSelectorResolver) fetchAllResourcesInOneNamespace(namespaceName string, placeName string) ([]runtime.Object, error) { + var resources []runtime.Object + + if !utils.ShouldPropagateNamespace(namespaceName, rs.SkippedNamespaces) { + err := fmt.Errorf("invalid clusterRresourcePlacement %s: namespace %s is not allowed to propagate", placeName, namespaceName) + return nil, NewUserError(err) + } + + klog.V(2).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName) + // select the namespace object itself + obj, err := rs.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName) + if err != nil { + klog.ErrorS(err, "cannot get the namespace", "namespace", namespaceName) + return nil, NewAPIServerError(true, client.IgnoreNotFound(err)) + } + nameSpaceObj := obj.DeepCopyObject().(*unstructured.Unstructured) + if nameSpaceObj.GetDeletionTimestamp() != nil { + // skip a to be deleted namespace + klog.V(2).InfoS("skip the deleting namespace resources by the selector", + "placeName", placeName, "namespace", namespaceName) + return resources, nil + } + resources = append(resources, obj) + + trackedResource := rs.InformerManager.GetNameSpaceScopedResources() + for _, gvr := range trackedResource { + if !utils.ShouldProcessResource(gvr, rs.RestMapper, rs.ResourceConfig) { + continue + } + if !rs.InformerManager.IsInformerSynced(gvr) { + return nil, NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr)) + } + lister := rs.InformerManager.Lister(gvr) + objs, err := lister.ByNamespace(namespaceName).List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Cannot list all the objects in namespace", "gvr", gvr, "namespace", namespaceName) + return nil, NewAPIServerError(true, err) + } + for _, obj := range objs { + shouldInclude, err := rs.ShouldPropagateObj(namespaceName, placeName, obj) + if err != nil { + return nil, err + } + if shouldInclude { + resources = append(resources, obj) + } + } + } + + return resources, nil +} + // fetchResources retrieves the objects based on the selector. -func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, placementKey types.NamespacedName) ([]runtime.Object, error) { +func (rs *ResourceSelectorResolver) fetchResources(selector placementv1beta1.ResourceSelectorTerm, placementKey types.NamespacedName) ([]runtime.Object, error) { klog.V(2).InfoS("Start to fetch resources by the selector", "selector", selector, "placement", placementKey) gk := schema.GroupKind{ Group: selector.Group, Kind: selector.Kind, } - restMapping, err := r.RestMapper.RESTMapping(gk, selector.Version) + restMapping, err := rs.RestMapper.RESTMapping(gk, selector.Version) if err != nil { - return nil, controller.NewUserError(fmt.Errorf("invalid placement %s, failed to get GVR of the selector: %w", placementKey, err)) + return nil, NewUserError(fmt.Errorf("invalid placement %s, failed to get GVR of the selector: %w", placementKey, err)) } gvr := restMapping.Resource gvk := schema.GroupVersionKind{ @@ -200,26 +445,26 @@ func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, Kind: selector.Kind, } - isNamespacedResource := !r.InformerManager.IsClusterScopedResources(gvk) + isNamespacedResource := !rs.InformerManager.IsClusterScopedResources(gvk) if isNamespacedResource && placementKey.Namespace == "" { // If it's a namespace-scoped resource but placement has no namespace, return error. err := fmt.Errorf("invalid placement %s: cannot select namespace-scoped resource %v in a clusterResourcePlacement", placementKey, gvr) klog.ErrorS(err, "Invalid resource selector", "selector", selector) - return nil, controller.NewUserError(err) + return nil, NewUserError(err) } else if !isNamespacedResource && placementKey.Namespace != "" { // If it's a cluster-scoped resource but placement has a namespace, return error. err := fmt.Errorf("invalid placement %s: cannot select cluster-scoped resource %v in a resourcePlacement", placementKey, gvr) klog.ErrorS(err, "Invalid resource selector", "selector", selector) - return nil, controller.NewUserError(err) + return nil, NewUserError(err) } - if !r.InformerManager.IsInformerSynced(gvr) { + if !rs.InformerManager.IsInformerSynced(gvr) { err := fmt.Errorf("informer cache for %+v is not synced yet", restMapping.Resource) klog.ErrorS(err, "Informer cache is not synced", "gvr", gvr, "placement", placementKey) - return nil, controller.NewExpectedBehaviorError(err) + return nil, NewExpectedBehaviorError(err) } - lister := r.InformerManager.Lister(gvr) + lister := rs.InformerManager.Lister(gvr) // TODO: validator should enforce the mutual exclusiveness between the `name` and `labelSelector` fields if len(selector.Name) != 0 { @@ -234,10 +479,10 @@ func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, if err != nil { klog.ErrorS(err, "Cannot get the resource", "gvr", gvr, "name", selector.Name, "namespace", placementKey.Namespace) - return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err)) + return nil, NewAPIServerError(true, client.IgnoreNotFound(err)) } - shouldInclude, err := r.shouldPropagateObj(placementKey.Namespace, placementKey.Name, obj) + shouldInclude, err := rs.ShouldPropagateObj(placementKey.Namespace, placementKey.Name, obj) if err != nil { return nil, err } @@ -254,7 +499,7 @@ func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, // TODO: validator should enforce the validity of the labelSelector labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector) if err != nil { - return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) + return nil, NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) } } @@ -268,12 +513,12 @@ func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, } if err != nil { klog.ErrorS(err, "Cannot list all the objects", "gvr", gvr, "labelSelector", labelSelector, "placement", placementKey) - return nil, controller.NewAPIServerError(true, err) + return nil, NewAPIServerError(true, err) } // go ahead and claim all objects by adding a finalizer and insert the placement in its annotation for i := 0; i < len(objects); i++ { - shouldInclude, err := r.shouldPropagateObj(placementKey.Namespace, placementKey.Name, objects[i]) + shouldInclude, err := rs.ShouldPropagateObj(placementKey.Namespace, placementKey.Name, objects[i]) if err != nil { return nil, err } @@ -285,7 +530,7 @@ func (r *Reconciler) fetchResources(selector fleetv1beta1.ResourceSelectorTerm, return selectedObjs, nil } -func (r *Reconciler) shouldPropagateObj(namespace, placementName string, obj runtime.Object) (bool, error) { +func (rs *ResourceSelectorResolver) ShouldPropagateObj(namespace, placementName string, obj runtime.Object) (bool, error) { uObj := obj.DeepCopyObject().(*unstructured.Unstructured) uObjKObj := klog.KObj(uObj) if uObj.GetDeletionTimestamp() != nil { @@ -294,7 +539,7 @@ func (r *Reconciler) shouldPropagateObj(namespace, placementName string, obj run return false, nil } - shouldInclude, err := utils.ShouldPropagateObj(r.InformerManager, uObj, r.EnableWorkload) + shouldInclude, err := ShouldPropagateObj(rs.InformerManager, uObj, rs.EnableWorkload) if err != nil { klog.ErrorS(err, "Cannot determine if we should propagate an object", "namespace", namespace, "placement", placementName, "object", uObjKObj) return false, err @@ -306,227 +551,67 @@ func (r *Reconciler) shouldPropagateObj(namespace, placementName string, obj run return true, nil } -// fetchNamespaceResources retrieves all the objects for a ResourceSelectorTerm that is for namespace. -func (r *Reconciler) fetchNamespaceResources(selector fleetv1beta1.ResourceSelectorTerm, placementName string) ([]runtime.Object, error) { - klog.V(2).InfoS("start to fetch the namespace resources by the selector", "selector", selector) - var resources []runtime.Object - - if len(selector.Name) != 0 { - // just a single namespace - objs, err := r.fetchAllResourcesInOneNamespace(selector.Name, placementName) - if err != nil { - klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", selector.Name) - return nil, err +// ShouldPropagateObj decides if one should propagate the object. +// PVCs are only propagated when enableWorkload is false (workloads not allowed on hub). +func ShouldPropagateObj(informerManager informer.Manager, uObj *unstructured.Unstructured, enableWorkload bool) (bool, error) { + // TODO: add more special handling for different resource kind + switch uObj.GroupVersionKind() { + case appv1.SchemeGroupVersion.WithKind(utils.ReplicaSetKind): + // Skip ReplicaSets if they are managed by Deployments (have owner references). + // Standalone ReplicaSets (without owners) can be propagated. + if len(uObj.GetOwnerReferences()) > 0 { + return false, nil } - return objs, err - } - - // go through each namespace - var labelSelector labels.Selector - var err error - if selector.LabelSelector == nil { - labelSelector = labels.Everything() - } else { - labelSelector, err = metav1.LabelSelectorAsSelector(selector.LabelSelector) - if err != nil { - return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot convert the label selector to a selector: %w", err)) + case appv1.SchemeGroupVersion.WithKind("ControllerRevision"): + // Skip ControllerRevisions if they are managed by DaemonSets/StatefulSets (have owner references). + // Standalone ControllerRevisions (without owners) can be propagated. + if len(uObj.GetOwnerReferences()) > 0 { + return false, nil } - } - namespaces, err := r.InformerManager.Lister(utils.NamespaceGVR).List(labelSelector) - if err != nil { - klog.ErrorS(err, "Cannot list all the namespaces by the label selector", "labelSelector", labelSelector, "placement", placementName) - return nil, controller.NewAPIServerError(true, err) - } - - for _, namespace := range namespaces { - ns, err := meta.Accessor(namespace) - if err != nil { - return nil, controller.NewUnexpectedBehaviorError(fmt.Errorf("cannot get the name of a namespace object: %w", err)) + case corev1.SchemeGroupVersion.WithKind(utils.ConfigMapKind): + // Skip the built-in custom CA certificate created in the namespace. + if uObj.GetName() == "kube-root-ca.crt" { + return false, nil } - objs, err := r.fetchAllResourcesInOneNamespace(ns.GetName(), placementName) - if err != nil { - klog.ErrorS(err, "failed to fetch all the selected resource in a namespace", "namespace", ns.GetName()) - return nil, err + case corev1.SchemeGroupVersion.WithKind("ServiceAccount"): + // Skip the default service account created in the namespace. + if uObj.GetName() == "default" { + return false, nil } - resources = append(resources, objs...) - } - return resources, nil -} - -// fetchAllResourcesInOneNamespace retrieves all the objects inside a single namespace which includes the namespace itself. -func (r *Reconciler) fetchAllResourcesInOneNamespace(namespaceName string, placeName string) ([]runtime.Object, error) { - var resources []runtime.Object - - if !utils.ShouldPropagateNamespace(namespaceName, r.SkippedNamespaces) { - err := fmt.Errorf("invalid clusterRresourcePlacement %s: namespace %s is not allowed to propagate", placeName, namespaceName) - return nil, controller.NewUserError(err) - } - - klog.V(2).InfoS("start to fetch all the resources inside a namespace", "namespace", namespaceName) - // select the namespace object itself - obj, err := r.InformerManager.Lister(utils.NamespaceGVR).Get(namespaceName) - if err != nil { - klog.ErrorS(err, "cannot get the namespace", "namespace", namespaceName) - return nil, controller.NewAPIServerError(true, client.IgnoreNotFound(err)) - } - nameSpaceObj := obj.DeepCopyObject().(*unstructured.Unstructured) - if nameSpaceObj.GetDeletionTimestamp() != nil { - // skip a to be deleted namespace - klog.V(2).InfoS("skip the deleting namespace resources by the selector", - "placeName", placeName, "namespace", namespaceName) - return resources, nil - } - resources = append(resources, obj) - - trackedResource := r.InformerManager.GetNameSpaceScopedResources() - for _, gvr := range trackedResource { - if !utils.ShouldProcessResource(gvr, r.RestMapper, r.ResourceConfig) { - continue + case corev1.SchemeGroupVersion.WithKind("Secret"): + // The secret, with type 'kubernetes.io/service-account-token', is created along with `ServiceAccount` should be + // prevented from propagating. + var secret corev1.Secret + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(uObj.Object, &secret); err != nil { + return false, NewUnexpectedBehaviorError(fmt.Errorf("failed to convert a secret object %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) } - if !r.InformerManager.IsInformerSynced(gvr) { - return nil, controller.NewExpectedBehaviorError(fmt.Errorf("informer cache for %+v is not synced yet", gvr)) + if secret.Type == corev1.SecretTypeServiceAccountToken { + return false, nil } - lister := r.InformerManager.Lister(gvr) - objs, err := lister.ByNamespace(namespaceName).List(labels.Everything()) - if err != nil { - klog.ErrorS(err, "Cannot list all the objects in namespace", "gvr", gvr, "namespace", namespaceName) - return nil, controller.NewAPIServerError(true, err) + case corev1.SchemeGroupVersion.WithKind("PersistentVolumeClaim"): + // Skip PersistentVolumeClaims by default to avoid conflicts with the PVCs created by statefulset + // This only happens if the workloads are allowed to run on the hub cluster. + if enableWorkload { + return false, nil } - for _, obj := range objs { - shouldInclude, err := r.shouldPropagateObj(namespaceName, placeName, obj) - if err != nil { - return nil, err - } - if shouldInclude { - resources = append(resources, obj) + case corev1.SchemeGroupVersion.WithKind("Endpoints"): + // we assume that all endpoints with the same name of a service is created by the service controller + if _, err := informerManager.Lister(utils.ServiceGVR).ByNamespace(uObj.GetNamespace()).Get(uObj.GetName()); err != nil { + if apierrors.IsNotFound(err) { + // there is no service of the same name as the end point, + // we assume that this endpoint is created by the user + return true, nil } + return false, NewAPIServerError(true, fmt.Errorf("failed to get the service %s in namespace %s: %w", uObj.GetName(), uObj.GetNamespace(), err)) } - } - - return resources, nil -} - -// generateRawContent strips all the unnecessary fields to prepare the objects for dispatch. -func generateRawContent(object *unstructured.Unstructured) ([]byte, error) { - // Make a deep copy of the object as we are modifying it. - object = object.DeepCopy() - // we keep the annotation/label/finalizer/owner references/delete grace period - object.SetResourceVersion("") - object.SetGeneration(0) - object.SetUID("") - object.SetSelfLink("") - object.SetDeletionTimestamp(nil) - object.SetManagedFields(nil) - - annots := object.GetAnnotations() - if annots != nil { - // Remove kubectl last applied annotation if exist - delete(annots, corev1.LastAppliedConfigAnnotation) - // Remove the revision annotation set by deployment controller. - delete(annots, deployment.RevisionAnnotation) - if len(annots) == 0 { - object.SetAnnotations(nil) - } else { - object.SetAnnotations(annots) - } - } - // Remove all the owner references as the UID in the owner reference can't be transferred to - // the member clusters - // TODO: Establish a way to keep the ownership relation through work-api - object.SetOwnerReferences(nil) - unstructured.RemoveNestedField(object.Object, "metadata", "creationTimestamp") - unstructured.RemoveNestedField(object.Object, "status") - - // TODO: see if there are other cases that we may have some extra fields - if object.GetKind() == "Service" && object.GetAPIVersion() == "v1" { - if clusterIP, exist, _ := unstructured.NestedString(object.Object, "spec", "clusterIP"); exist && clusterIP != corev1.ClusterIPNone { - unstructured.RemoveNestedField(object.Object, "spec", "clusterIP") - unstructured.RemoveNestedField(object.Object, "spec", "clusterIPs") - } - // We should remove all node ports that are assigned by hubcluster if any. - unstructured.RemoveNestedField(object.Object, "spec", "healthCheckNodePort") - - vals, found, err := unstructured.NestedFieldNoCopy(object.Object, "spec", "ports") - if found && err == nil { - if ports, ok := vals.([]interface{}); ok { - for i := range ports { - if each, ok := ports[i].(map[string]interface{}); ok { - delete(each, "nodePort") - } - } - } - } - if err != nil { - return nil, fmt.Errorf("failed to get the ports field in Service object, name =%s: %w", object.GetName(), err) - } - } else if object.GetKind() == "Job" && object.GetAPIVersion() == batchv1.SchemeGroupVersion.String() { - if manualSelector, exist, _ := unstructured.NestedBool(object.Object, "spec", "manualSelector"); !exist || !manualSelector { - // remove the selector field and labels added by the api-server if the job is not created with manual selector - // whose value conflict with the ones created by the member cluster api server - // https://github.com/kubernetes/kubernetes/blob/d4fde1e92a83cb533ae63b3abe9d49f08efb7a2f/pkg/registry/batch/job/strategy.go#L219 - // k8s used to add an old label called "controller-uid" but use a new label called "batch.kubernetes.io/controller-uid" after 1.26 - unstructured.RemoveNestedField(object.Object, "spec", "selector", "matchLabels", "controller-uid") - unstructured.RemoveNestedField(object.Object, "spec", "selector", "matchLabels", "batch.kubernetes.io/controller-uid") - unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "creationTimestamp") - unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "controller-uid") - unstructured.RemoveNestedField(object.Object, "spec", "template", "metadata", "labels", "batch.kubernetes.io/controller-uid") - } - } - - rawContent, err := object.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to marshal the unstructured object gvk = %s, name =%s: %w", object.GroupVersionKind(), object.GetName(), err) - } - return rawContent, nil -} - -// generateResourceContent creates a resource content from the unstructured obj. -func generateResourceContent(object *unstructured.Unstructured) (*fleetv1beta1.ResourceContent, error) { - rawContent, err := generateRawContent(object) - if err != nil { - return nil, controller.NewUnexpectedBehaviorError(err) - } - return &fleetv1beta1.ResourceContent{ - RawExtension: runtime.RawExtension{Raw: rawContent}, - }, nil -} - -// selectResourcesForPlacement selects the resources according to the placement resourceSelectors. -// It also generates an array of resource content and resource identifier based on the selected resources. -// It also returns the number of envelope configmaps so the CRP controller can have the right expectation of the number of work objects. -func (r *Reconciler) selectResourcesForPlacement(placementObj fleetv1beta1.PlacementObj) (int, []fleetv1beta1.ResourceContent, []fleetv1beta1.ResourceIdentifier, error) { - envelopeObjCount := 0 - selectedObjects, err := r.gatherSelectedResource(types.NamespacedName{ - Name: placementObj.GetName(), - Namespace: placementObj.GetNamespace(), - }, placementObj.GetPlacementSpec().ResourceSelectors) - if err != nil { - return 0, nil, nil, err - } - - resources := make([]fleetv1beta1.ResourceContent, len(selectedObjects)) - resourcesIDs := make([]fleetv1beta1.ResourceIdentifier, len(selectedObjects)) - for i, unstructuredObj := range selectedObjects { - rc, err := generateResourceContent(unstructuredObj) - if err != nil { - return 0, nil, nil, err - } - uGVK := unstructuredObj.GetObjectKind().GroupVersionKind().GroupKind() - switch uGVK { - case utils.ClusterResourceEnvelopeGK: - envelopeObjCount++ - case utils.ResourceEnvelopeGK: - envelopeObjCount++ - } - resources[i] = *rc - ri := fleetv1beta1.ResourceIdentifier{ - Group: unstructuredObj.GroupVersionKind().Group, - Version: unstructuredObj.GroupVersionKind().Version, - Kind: unstructuredObj.GroupVersionKind().Kind, - Name: unstructuredObj.GetName(), - Namespace: unstructuredObj.GetNamespace(), + // we find a service of the same name as the endpoint, we assume it's created by the service + return false, nil + case discoveryv1.SchemeGroupVersion.WithKind("EndpointSlice"): + // all EndpointSlice created by the EndpointSlice controller has a managed by label + if _, exist := uObj.GetLabels()[discoveryv1.LabelManagedBy]; exist { + // do not propagate hub cluster generated endpoint slice + return false, nil } - resourcesIDs[i] = ri } - return envelopeObjCount, resources, resourcesIDs, nil + return true, nil } diff --git a/pkg/controllers/placement/resource_selector_test.go b/pkg/utils/controller/resource_selector_resolver_test.go similarity index 90% rename from pkg/controllers/placement/resource_selector_test.go rename to pkg/utils/controller/resource_selector_resolver_test.go index a0a73b2ac..06b783d10 100644 --- a/pkg/controllers/placement/resource_selector_test.go +++ b/pkg/utils/controller/resource_selector_resolver_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package placement +package controller import ( "errors" @@ -38,7 +38,6 @@ import ( fleetv1beta1 "go.goms.io/fleet/apis/placement/v1beta1" "go.goms.io/fleet/pkg/utils" - "go.goms.io/fleet/pkg/utils/controller" testinformer "go.goms.io/fleet/test/utils/informer" ) @@ -533,7 +532,7 @@ func TestGatherSelectedResource(t *testing.T) { Listers: map[schema.GroupVersionResource]*testinformer.FakeLister{}, }, want: nil, - wantError: controller.ErrUserError, + wantError: ErrUserError, }, { name: "should handle single resource selection successfully", @@ -606,7 +605,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrUnexpectedBehavior, + wantError: ErrUnexpectedBehavior, }, { name: "should return error using label selector when informer manager returns error", @@ -630,7 +629,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrAPIServerError, + wantError: ErrAPIServerError, }, { name: "should return only non-deleting resources when mixed with deleting resources", @@ -745,7 +744,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrUserError, + wantError: ErrUserError, }, { name: "should sort resources according to apply order", @@ -795,7 +794,7 @@ func TestGatherSelectedResource(t *testing.T) { Listers: map[schema.GroupVersionResource]*testinformer.FakeLister{}, }, want: nil, - wantError: controller.ErrUserError, + wantError: ErrUserError, }, { name: "should sort resources for cluster scoped placement", @@ -1027,7 +1026,7 @@ func TestGatherSelectedResource(t *testing.T) { NamespaceScopedResources: []schema.GroupVersionResource{utils.DeploymentGVR, utils.ConfigMapGVR}, } }(), - wantError: controller.ErrUserError, + wantError: ErrUserError, }, { name: "should return error when selecting a reserved namespace for cluster scoped placement", @@ -1057,7 +1056,7 @@ func TestGatherSelectedResource(t *testing.T) { NamespaceScopedResources: []schema.GroupVersionResource{utils.DeploymentGVR, utils.ConfigMapGVR}, } }(), - wantError: controller.ErrUserError, + wantError: ErrUserError, }, { name: "should return empty result when informer manager returns not found error for cluster scoped placement", @@ -1109,7 +1108,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrUnexpectedBehavior, + wantError: ErrUnexpectedBehavior, }, { name: "should return error using label selector when informer manager returns error (getting namespace) for cluster scoped placement", @@ -1134,7 +1133,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrAPIServerError, + wantError: ErrAPIServerError, }, { name: "should return error when informer manager returns non-NotFound error (getting deployment) for cluster scoped placement", @@ -1162,7 +1161,7 @@ func TestGatherSelectedResource(t *testing.T) { NamespaceScopedResources: []schema.GroupVersionResource{utils.DeploymentGVR}, } }(), - wantError: controller.ErrUnexpectedBehavior, + wantError: ErrUnexpectedBehavior, }, { name: "should skip reserved resources for namespaced placement", @@ -1228,7 +1227,7 @@ func TestGatherSelectedResource(t *testing.T) { InformerSynced: ptr.To(false), } }(), - wantError: controller.ErrExpectedBehavior, + wantError: ErrExpectedBehavior, }, { name: "should return error when informer cache is not synced for cluster scoped placement", @@ -1251,7 +1250,7 @@ func TestGatherSelectedResource(t *testing.T) { InformerSynced: ptr.To(false), } }(), - wantError: controller.ErrExpectedBehavior, + wantError: ErrExpectedBehavior, }, { name: "should return error when informer cache is not synced for cluster scoped placement with namespace resources", @@ -1280,7 +1279,7 @@ func TestGatherSelectedResource(t *testing.T) { InformerSynced: ptr.To(false), } }(), - wantError: controller.ErrExpectedBehavior, + wantError: ErrExpectedBehavior, }, { name: "should return error when shouldPropagateObj returns error", @@ -1308,7 +1307,7 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrUnexpectedBehavior, + wantError: ErrUnexpectedBehavior, }, { name: "should return error by selecting all the endpoints when shouldPropagateObj returns error", @@ -1335,19 +1334,19 @@ func TestGatherSelectedResource(t *testing.T) { }, } }(), - wantError: controller.ErrUnexpectedBehavior, + wantError: ErrUnexpectedBehavior, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r := &Reconciler{ + rsr := &ResourceSelectorResolver{ ResourceConfig: tt.resourceConfig, InformerManager: tt.informerManager, RestMapper: newFakeRESTMapper(), } - got, err := r.gatherSelectedResource(tt.placementName, tt.selectors) + got, err := rsr.gatherSelectedResource(tt.placementName, tt.selectors) if gotErr, wantErr := err != nil, tt.wantError != nil; gotErr != wantErr || !errors.Is(err, tt.wantError) { t.Fatalf("gatherSelectedResource() = %v, want error %v", err, tt.wantError) } @@ -1963,3 +1962,260 @@ func TestSortResources(t *testing.T) { }) } } + +func TestShouldPropagateObj(t *testing.T) { + tests := []struct { + name string + obj map[string]interface{} + ownerReferences []metav1.OwnerReference + enableWorkload bool + want bool + }{ + { + name: "standalone replicaset without ownerReferences should propagate", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ReplicaSet", + "metadata": map[string]interface{}{ + "name": "standalone-rs", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: true, + want: true, + }, + { + name: "standalone replicaset without ownerReferences should propagate if workload is disabled", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ReplicaSet", + "metadata": map[string]interface{}{ + "name": "standalone-rs", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: false, + want: true, + }, + { + name: "standalone pod without ownerReferences should propagate", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "standalone-pod", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: true, + want: true, + }, + { + name: "replicaset with deployment owner should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ReplicaSet", + "metadata": map[string]interface{}{ + "name": "test-deploy-abc123", + "namespace": "default", + }, + }, + ownerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "Deployment", + Name: "test-deploy", + UID: "12345", + }, + }, + enableWorkload: true, + want: false, + }, + { + name: "pod owned by replicaset - passes ShouldPropagateObj but filtered by resource config", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "name": "test-deploy-abc123-xyz", + "namespace": "default", + }, + }, + ownerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "ReplicaSet", + Name: "test-deploy-abc123", + UID: "67890", + }, + }, + enableWorkload: false, + want: true, // ShouldPropagateObj doesn't filter Pods - they're filtered by NewResourceConfig + }, + { + name: "controllerrevision owned by daemonset should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ControllerRevision", + "metadata": map[string]interface{}{ + "name": "test-ds-7b9848797f", + "namespace": "default", + }, + }, + ownerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "DaemonSet", + Name: "test-ds", + UID: "abcdef", + }, + }, + enableWorkload: false, + want: false, + }, + { + name: "controllerrevision owned by statefulset should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ControllerRevision", + "metadata": map[string]interface{}{ + "name": "test-ss-7878b4b446", + "namespace": "default", + }, + }, + ownerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "test-ss", + UID: "fedcba", + }, + }, + enableWorkload: false, + want: false, + }, + { + name: "standalone controllerrevision without owner should propagate", + obj: map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "ControllerRevision", + "metadata": map[string]interface{}{ + "name": "custom-revision", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: false, + want: true, + }, + { + name: "PVC should propagate when workload is disabled", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": map[string]interface{}{ + "name": "test-pvc", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: false, + want: true, + }, + { + name: "PVC should NOT propagate when workload is enabled", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": map[string]interface{}{ + "name": "test-pvc", + "namespace": "default", + }, + }, + ownerReferences: nil, + enableWorkload: true, + want: false, + }, + { + name: "PVC with ownerReferences should NOT propagate when workload is enabled", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": map[string]interface{}{ + "name": "data-statefulset-0", + "namespace": "default", + }, + }, + ownerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "statefulset", + UID: "sts-uid", + }, + }, + enableWorkload: true, + want: false, + }, + { + name: "Default ServiceAccount in namespace should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "ServiceAccount", + "metadata": map[string]interface{}{ + "name": "default", + "namespace": "test-ns", + }, + }, + want: false, + }, + { + name: "service-account-token secret should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Secret", + "metadata": map[string]interface{}{ + "name": "sa-token", + "namespace": "test", + }, + "type": string(corev1.SecretTypeServiceAccountToken), + }, + want: false, + }, + { + name: "endpointslice with managed-by label should NOT propagate", + obj: map[string]interface{}{ + "apiVersion": "discovery.k8s.io/v1", + "kind": "EndpointSlice", + "metadata": map[string]interface{}{ + "name": "test-endpointslice", + "labels": map[string]interface{}{ + "endpointslice.kubernetes.io/managed-by": "endpointslice-controller", + }, + }, + }, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + uObj := &unstructured.Unstructured{Object: tt.obj} + if tt.ownerReferences != nil { + uObj.SetOwnerReferences(tt.ownerReferences) + } + + got, err := ShouldPropagateObj(nil, uObj, tt.enableWorkload) + if err != nil { + t.Errorf("ShouldPropagateObj() error = %v", err) + return + } + if got != tt.want { + t.Errorf("ShouldPropagateObj() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/utils/informer/readiness/readiness_test.go b/pkg/utils/informer/readiness/readiness_test.go index fab796e65..5fb7fb97e 100644 --- a/pkg/utils/informer/readiness/readiness_test.go +++ b/pkg/utils/informer/readiness/readiness_test.go @@ -38,7 +38,7 @@ func TestReadinessChecker(t *testing.T) { name: "nil informer", resourceInformer: nil, expectError: true, - errorContains: "resource informer not initialized", + errorContains: "resource informer is nil", }, { name: "no resources registered", diff --git a/test/e2e/framework/cluster.go b/test/e2e/framework/cluster.go index 9c15a33a1..ae705934a 100644 --- a/test/e2e/framework/cluster.go +++ b/test/e2e/framework/cluster.go @@ -18,6 +18,7 @@ package framework import ( "os" + "path/filepath" "github.com/onsi/gomega" "k8s.io/apimachinery/pkg/api/meta" @@ -33,7 +34,19 @@ import ( "go.goms.io/fleet/pkg/propertyprovider/azure/trackers" ) -var kubeconfigPath = os.Getenv("KUBECONFIG") +var ( + kubeconfigPath string +) + +func init() { + kubeconfigPath = os.Getenv("KUBECONFIG") + if kubeconfigPath == "" { + // Default to $HOME/.kube/config like kubectl does. + if home, err := os.UserHomeDir(); err == nil { + kubeconfigPath = filepath.Join(home, ".kube", "config") + } + } +} // Cluster object defines the required clients based on the kubeconfig of the test cluster. type Cluster struct { diff --git a/test/e2e/setup.sh b/test/e2e/setup.sh index f53d4d3a8..7de3df3e1 100755 --- a/test/e2e/setup.sh +++ b/test/e2e/setup.sh @@ -202,6 +202,8 @@ kind export kubeconfig --name $HUB_CLUSTER HUB_SERVER_URL="https://$(docker inspect $HUB_CLUSTER-control-plane --format='{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'):6443" # Install the member agents and related components +# Note that the work applier in the member agent are set to requeue at max. every 5 seconds instead of using the default +# exponential backoff behavior; this is to accommodate some of the timeout settings in the E2E test specs. for (( i=0; i<${MEMBER_CLUSTER_COUNT}; i++ )); do kind export kubeconfig --name "${MEMBER_CLUSTERS[$i]}" @@ -222,6 +224,8 @@ do --set logVerbosity=5 \ --set namespace=fleet-system \ --set enableV1Beta1APIs=true \ + --set workApplierRequeueRateLimiterMaxSlowBackoffDelaySeconds=5 \ + --set workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds=5 \ --set propertyProvider=$PROPERTY_PROVIDER \ --set region=${REGIONS[$i]} \ $( [ "$PROPERTY_PROVIDER" = "azure" ] && echo "-f azure_valid_config.yaml" ) @@ -242,6 +246,8 @@ do --set logVerbosity=5 \ --set namespace=fleet-system \ --set enableV1Beta1APIs=true \ + --set workApplierRequeueRateLimiterMaxSlowBackoffDelaySeconds=5 \ + --set workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds=5 \ --set propertyProvider=$PROPERTY_PROVIDER \ $( [ "$PROPERTY_PROVIDER" = "azure" ] && echo "-f azure_valid_config.yaml" ) fi diff --git a/test/e2e/setup_test.go b/test/e2e/setup_test.go index 2e8f353c5..4e2234b76 100644 --- a/test/e2e/setup_test.go +++ b/test/e2e/setup_test.go @@ -76,7 +76,6 @@ const ( hubClusterSAName = "fleet-hub-agent" fleetSystemNS = "fleet-system" - kubeConfigPathEnvVarName = "KUBECONFIG" propertyProviderEnvVarName = "PROPERTY_PROVIDER" azurePropertyProviderEnvVarValue = "azure" fleetClusterResourceIDAnnotationKey = "fleet.azure.com/cluster-resource-id" @@ -323,9 +322,6 @@ func beforeSuiteForAllProcesses() { klog.InitFlags(fs) Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed()) - // Check if the required environment variable, which specifies the path to kubeconfig file, has been set. - Expect(os.Getenv(kubeConfigPathEnvVarName)).NotTo(BeEmpty(), "Required environment variable KUBECONFIG is not set") - resourceSnapshotCreationMinimumIntervalEnv := os.Getenv("RESOURCE_SNAPSHOT_CREATION_MINIMUM_INTERVAL") if resourceSnapshotCreationMinimumIntervalEnv == "" { // If the environment variable is not set, use a default value. diff --git a/test/upgrade/after/setup_test.go b/test/upgrade/after/setup_test.go index 17a10b4ef..44f325f68 100644 --- a/test/upgrade/after/setup_test.go +++ b/test/upgrade/after/setup_test.go @@ -63,8 +63,6 @@ const ( memberCluster1EastProdSAName = "fleet-member-agent-cluster-1" memberCluster2EastCanarySAName = "fleet-member-agent-cluster-2" memberCluster3WestProdSAName = "fleet-member-agent-cluster-3" - - kubeConfigPathEnvVarName = "KUBECONFIG" ) const ( @@ -183,9 +181,6 @@ func beforeSuiteForAllProcesses() { klog.InitFlags(fs) Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed()) - // Check if the required environment variable, which specifies the path to kubeconfig file, has been set. - Expect(os.Getenv(kubeConfigPathEnvVarName)).NotTo(BeEmpty(), "Required environment variable KUBECONFIG is not set") - // Initialize the cluster objects and their clients. hubCluster = framework.NewCluster(hubClusterName, "", scheme, nil) Expect(hubCluster).NotTo(BeNil(), "Failed to initialize cluster object") diff --git a/test/upgrade/before/setup_test.go b/test/upgrade/before/setup_test.go index 7a8687e5b..0d066016a 100644 --- a/test/upgrade/before/setup_test.go +++ b/test/upgrade/before/setup_test.go @@ -67,8 +67,6 @@ const ( hubClusterSAName = "fleet-hub-agent" fleetSystemNS = "fleet-system" - - kubeConfigPathEnvVarName = "KUBECONFIG" ) const ( @@ -207,9 +205,6 @@ func beforeSuiteForAllProcesses() { klog.InitFlags(fs) Expect(fs.Parse([]string{"--v", "5", "-add_dir_header", "true"})).Should(Succeed()) - // Check if the required environment variable, which specifies the path to kubeconfig file, has been set. - Expect(os.Getenv(kubeConfigPathEnvVarName)).NotTo(BeEmpty(), "Required environment variable KUBECONFIG is not set") - // Initialize the cluster objects and their clients. hubCluster = framework.NewCluster(hubClusterName, "", scheme, nil) Expect(hubCluster).NotTo(BeNil(), "Failed to initialize cluster object")