Skip to content

Commit 0d1783f

Browse files
authored
Merge pull request #89 from kcp-dev/endpointslices
Support APIExportEndpointSlices / kcp 0.28
2 parents 09d43ad + f3e6ee2 commit 0d1783f

File tree

12 files changed

+915
-164
lines changed

12 files changed

+915
-164
lines changed

.prow.yaml

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ presubmits:
8888
memory: 4Gi
8989
cpu: 2
9090

91-
- name: pull-api-syncagent-test-e2e
91+
- name: pull-api-syncagent-test-e2e-kcp-0.27
9292
always_run: true
9393
decorate: true
9494
clone_uri: "https://github.com/kcp-dev/api-syncagent"
@@ -99,10 +99,29 @@ presubmits:
9999
- image: ghcr.io/kcp-dev/infra/build:1.24.3-1
100100
command:
101101
- hack/ci/run-e2e-tests.sh
102+
env:
103+
- name: KCP_VERSION
104+
value: '0.27.1'
105+
resources:
106+
requests:
107+
memory: 4Gi
108+
cpu: 2
109+
110+
- name: pull-api-syncagent-test-e2e-kcp-0.28
111+
always_run: true
112+
decorate: true
113+
clone_uri: "https://github.com/kcp-dev/api-syncagent"
114+
labels:
115+
preset-goproxy: "true"
116+
spec:
117+
containers:
118+
- image: ghcr.io/kcp-dev/infra/build:1.24.3-1
119+
command:
120+
- hack/ci/run-e2e-tests.sh
121+
env:
122+
- name: KCP_VERSION
123+
value: '0.28.1'
102124
resources:
103125
requests:
104126
memory: 4Gi
105127
cpu: 2
106-
# docker-in-docker needs privileged mode
107-
securityContext:
108-
privileged: true

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ $(YQ):
9898
yq_*
9999

100100
KCP = _tools/kcp
101-
KCP_VERSION = 0.27.1
101+
KCP_VERSION ?= 0.28.1
102102

103103
.PHONY: $(KCP)
104104
$(KCP):

cmd/api-syncagent/kcp.go

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
/*
2+
Copyright 2025 The KCP Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
"regexp"
24+
25+
"github.com/kcp-dev/logicalcluster/v3"
26+
27+
"github.com/kcp-dev/api-syncagent/internal/kcp"
28+
29+
kcpdevv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
30+
kcpdevcore "github.com/kcp-dev/kcp/sdk/apis/core"
31+
kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
32+
33+
"k8s.io/apimachinery/pkg/fields"
34+
"k8s.io/apimachinery/pkg/runtime"
35+
"k8s.io/apimachinery/pkg/types"
36+
"k8s.io/client-go/rest"
37+
"sigs.k8s.io/controller-runtime/pkg/cache"
38+
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
39+
"sigs.k8s.io/controller-runtime/pkg/cluster"
40+
)
41+
42+
// The agent has two potentially different kcp clusters:
43+
//
44+
// endpointCluster - this is where the source of the virtual workspace URLs
45+
// live, i.e. where the APIExport/EndpointSlice.
46+
// managedCluster - this is where the APIExport and APIResourceSchemas
47+
// exist that are meant to be reconciled.
48+
//
49+
// The managedCluster always exists, the endpointCluster only if the workspace
50+
// for the virtual workspace source is different from the managed cluster.
51+
52+
// setupEndpointKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
53+
// that is solvely used to watch whichever object holds the virtual workspace URLs,
54+
// either the APIExport or the APIExportEndpointSlice.
55+
func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
56+
// no need for a dedicated endpoint cluster
57+
if endpoint.EndpointSlice == nil || endpoint.EndpointSlice.Cluster == endpoint.APIExport.Cluster {
58+
return nil, nil
59+
}
60+
61+
scheme := runtime.NewScheme()
62+
63+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
64+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
65+
}
66+
67+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
68+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
69+
}
70+
71+
// RBAC in kcp might be very tight and might not allow to list/watch all objects;
72+
// restrict the cache's selectors accordingly so we can still make use of caching.
73+
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
74+
&kcpdevv1alpha1.APIExportEndpointSlice{}: {
75+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.EndpointSlice.Name}),
76+
},
77+
}
78+
79+
return cluster.New(endpoint.EndpointSlice.Config, func(o *cluster.Options) {
80+
o.Scheme = scheme
81+
o.Cache = cache.Options{
82+
Scheme: scheme,
83+
ByObject: byObject,
84+
}
85+
})
86+
}
87+
88+
// setupManagedKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
89+
// that is solvely used to manage the APIExport and APIResourceSchemas.
90+
func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
91+
scheme := runtime.NewScheme()
92+
93+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
94+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
95+
}
96+
97+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
98+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
99+
}
100+
101+
// RBAC in kcp might be very tight and might not allow to list/watch all objects;
102+
// restrict the cache's selectors accordingly so we can still make use of caching.
103+
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
104+
&kcpdevv1alpha1.APIExport{}: {
105+
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.APIExport.Name}),
106+
},
107+
}
108+
109+
return cluster.New(endpoint.APIExport.Config, func(o *cluster.Options) {
110+
o.Scheme = scheme
111+
o.Cache = cache.Options{
112+
Scheme: scheme,
113+
ByObject: byObject,
114+
}
115+
})
116+
}
117+
118+
type qualifiedCluster struct {
119+
Cluster logicalcluster.Name
120+
Path logicalcluster.Path
121+
Config *rest.Config
122+
}
123+
124+
type qualifiedAPIExport struct {
125+
*kcpdevv1alpha1.APIExport
126+
qualifiedCluster
127+
}
128+
129+
type qualifiedAPIExportEndpointSlice struct {
130+
*kcpdevv1alpha1.APIExportEndpointSlice
131+
qualifiedCluster
132+
}
133+
134+
type syncEndpoint struct {
135+
APIExport qualifiedAPIExport
136+
EndpointSlice *qualifiedAPIExportEndpointSlice
137+
}
138+
139+
// resolveSyncEndpoint takes the user provided (usually via CLI flags) APIExportEndpointSliceRef and
140+
// APIExportRef and resolves, returning a consistent SyncEndpoint. The initialRestConfig must point
141+
// to the cluster where either of the two objects reside (i.e. if the APIExportRef is given, it
142+
// must point to the cluster where the APIExport lives, and vice versa for the endpoint slice;
143+
// however the endpoint slice references an APIExport in potentially another cluster, and for this
144+
// case the initialRestConfig will be rewritten accordingly).
145+
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string, apiExportRef string) (*syncEndpoint, error) {
146+
// construct temporary, uncached client
147+
scheme := runtime.NewScheme()
148+
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
149+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
150+
}
151+
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
152+
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
153+
}
154+
155+
clientOpts := ctrlruntimeclient.Options{Scheme: scheme}
156+
client, err := ctrlruntimeclient.New(initialRestConfig, clientOpts)
157+
if err != nil {
158+
return nil, fmt.Errorf("failed to create service reader: %w", err)
159+
}
160+
161+
se := &syncEndpoint{}
162+
163+
// When an endpoint ref is given, both the APIExportEndpointSlice and the APIExport must exist.
164+
if endpointSliceRef != "" {
165+
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
166+
if err != nil {
167+
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
168+
}
169+
endpointSlice.Config = initialRestConfig
170+
171+
// find the APIExport referenced not by the user (can't: both ref parameters to this function
172+
// are mutually exclusive), but in the APIExportEndpointSlice.
173+
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
174+
if err != nil {
175+
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
176+
}
177+
178+
client, err := ctrlruntimeclient.New(restConfig, clientOpts)
179+
if err != nil {
180+
return nil, fmt.Errorf("failed to create service reader: %w", err)
181+
}
182+
183+
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
184+
if err != nil {
185+
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
186+
}
187+
apiExport.Config = restConfig
188+
189+
se.APIExport = apiExport
190+
se.EndpointSlice = &endpointSlice
191+
} else { // if an export ref is given, the endpoint slice is optional (for compat with kcp <0.28)
192+
apiExport, err := resolveAPIExport(ctx, client, apiExportRef)
193+
if err != nil {
194+
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
195+
}
196+
apiExport.Config = initialRestConfig
197+
198+
se.APIExport = apiExport
199+
200+
// try to find an endpoint slice in the same workspace with the same name as the APIExport
201+
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, apiExportRef)
202+
if ctrlruntimeclient.IgnoreNotFound(err) != nil {
203+
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
204+
} else if err == nil {
205+
apiExport.Config = initialRestConfig
206+
se.EndpointSlice = &endpointSlice
207+
}
208+
}
209+
210+
return se, nil
211+
}
212+
213+
func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExportEndpointSlice, error) {
214+
endpointSlice := &kcpdevv1alpha1.APIExportEndpointSlice{}
215+
key := types.NamespacedName{Name: ref}
216+
if err := client.Get(ctx, key, endpointSlice); err != nil {
217+
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to get APIExportEndpointSlice %q: %w", ref, err)
218+
}
219+
220+
lcName, lcPath, err := resolveCurrentCluster(ctx, client)
221+
if err != nil {
222+
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to resolve APIExportEndpointSlice cluster: %w", err)
223+
}
224+
225+
return qualifiedAPIExportEndpointSlice{
226+
APIExportEndpointSlice: endpointSlice,
227+
qualifiedCluster: qualifiedCluster{
228+
Cluster: lcName,
229+
Path: lcPath,
230+
},
231+
}, nil
232+
}
233+
234+
func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExport, error) {
235+
apiExport := &kcpdevv1alpha1.APIExport{}
236+
key := types.NamespacedName{Name: ref}
237+
if err := client.Get(ctx, key, apiExport); err != nil {
238+
return qualifiedAPIExport{}, fmt.Errorf("failed to get APIExport %q: %w", ref, err)
239+
}
240+
241+
lcName, lcPath, err := resolveCurrentCluster(ctx, client)
242+
if err != nil {
243+
return qualifiedAPIExport{}, fmt.Errorf("failed to resolve APIExport cluster: %w", err)
244+
}
245+
246+
return qualifiedAPIExport{
247+
APIExport: apiExport,
248+
qualifiedCluster: qualifiedCluster{
249+
Cluster: lcName,
250+
Path: lcPath,
251+
},
252+
}, nil
253+
}
254+
255+
func resolveCurrentCluster(ctx context.Context, client ctrlruntimeclient.Client) (logicalcluster.Name, logicalcluster.Path, error) {
256+
lc := &kcpdevcorev1alpha1.LogicalCluster{}
257+
if err := client.Get(ctx, types.NamespacedName{Name: kcp.IdentityClusterName}, lc); err != nil {
258+
return "", logicalcluster.None, fmt.Errorf("failed to resolve current workspace: %w", err)
259+
}
260+
261+
lcName := logicalcluster.From(lc)
262+
lcPath := logicalcluster.NewPath(lc.Annotations[kcpdevcore.LogicalClusterPathAnnotationKey])
263+
264+
return lcName, lcPath, nil
265+
}
266+
267+
var clusterFinder = regexp.MustCompile(`/clusters/([^/]+)`)
268+
269+
func retargetRestConfig(cfg *rest.Config, destination string) (*rest.Config, error) {
270+
// no change desired (use current cluster implicitly)
271+
if destination == "" {
272+
return cfg, nil
273+
}
274+
275+
matches := clusterFinder.FindAllStringSubmatch(cfg.Host, -1)
276+
if len(matches) == 0 {
277+
return nil, errors.New("URL must point to a cluster/workspace")
278+
}
279+
if len(matches) > 1 {
280+
return nil, errors.New("invalid URL: URL contains more than one cluster path")
281+
}
282+
283+
current := matches[0][1]
284+
if current == destination {
285+
return cfg, nil
286+
}
287+
288+
newCluster := fmt.Sprintf("/clusters/%s", destination)
289+
290+
newConfig := rest.CopyConfig(cfg)
291+
newConfig.Host = clusterFinder.ReplaceAllString(cfg.Host, newCluster)
292+
293+
return newConfig, nil
294+
}

0 commit comments

Comments
 (0)