Skip to content

Commit 4711cef

Browse files
committed
Do not block on sync controller start and engage existing clusters
On-behalf-of: SAP <marvin.beckers@sap.com> Signed-off-by: Marvin Beckers <marvin@kubermatic.com>
1 parent 1052e57 commit 4711cef

File tree

3 files changed

+243
-26
lines changed

3 files changed

+243
-26
lines changed

internal/controller/apiresourceschema/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, pubR
174174
if apierrors.IsNotFound(err) {
175175
ars, err := kcp.CreateAPIResourceSchema(projectedCRD, arsName, r.agentName)
176176
if err != nil {
177-
return nil, fmt.Errorf("failed to create APIResourceSchema: %w", err)
177+
return nil, fmt.Errorf("failed to construct APIResourceSchema: %w", err)
178178
}
179179

180180
log.With("name", arsName).Info("Creating APIResourceSchema…")

internal/controller/syncmanager/controller.go

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@ package syncmanager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
23+
"sync"
2224

2325
"go.uber.org/zap"
2426

25-
"github.com/kcp-dev/api-syncagent/internal/controller/sync"
27+
controllersync "github.com/kcp-dev/api-syncagent/internal/controller/sync"
2628
"github.com/kcp-dev/api-syncagent/internal/controllerutil"
2729
"github.com/kcp-dev/api-syncagent/internal/controllerutil/predicate"
2830
"github.com/kcp-dev/api-syncagent/internal/discovery"
@@ -91,12 +93,20 @@ type Reconciler struct {
9193

9294
// The provider based on the APIExport; like the vwManager, this is stopped and recreated
9395
// whenever the APIExport's URL changes.
94-
vwProvider *apiexportprovider.Provider
96+
providerOnce sync.Once
97+
vwProvider *apiexportprovider.Provider
9598

99+
syncWorkersLock sync.RWMutex
96100
// A map of sync controllers, one for each PublishedResource, using their
97101
// UIDs and resourceVersion as the map keys; using the version ensures that
98102
// when a PR changes, the old controller is orphaned and will be shut down.
99103
syncWorkers map[string]syncWorker
104+
105+
clustersLock sync.RWMutex
106+
// A map of clusters that have been engaged with the shim layer. Since this
107+
// reconciler dynamically starts and stops controllers, we need to keep track
108+
// of clusters and engage them with sync controllers started at a later point in time.
109+
clusters map[string]engagedCluster
100110
}
101111

102112
type syncWorker struct {
@@ -135,7 +145,14 @@ func Add(
135145
prFilter: prFilter,
136146
stateNamespace: stateNamespace,
137147
agentName: agentName,
148+
149+
providerOnce: sync.Once{},
150+
151+
syncWorkersLock: sync.RWMutex{},
138152
syncWorkers: map[string]syncWorker{},
153+
154+
clustersLock: sync.RWMutex{},
155+
clusters: make(map[string]engagedCluster),
139156
}
140157

141158
bldr := builder.ControllerManagedBy(localManager).
@@ -161,7 +178,7 @@ func Add(
161178
return err
162179
}
163180

164-
func (r *Reconciler) Reconcile(ctx context.Context, _ reconcile.Request) (reconcile.Result, error) {
181+
func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
165182
log := r.log.Named(ControllerName)
166183
log.Debug("Processing")
167184

@@ -232,9 +249,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, vwUR
232249
}
233250

234251
func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
235-
// Use the global app context so this provider is independent of the reconcile
236-
// context, which might get cancelled right after Reconcile() is done.
237-
r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx)
252+
if r.vwManagerCtx == nil {
253+
// Use the global app context so this provider is independent of the reconcile
254+
// context, which might get cancelled right after Reconcile() is done.
255+
r.vwManagerCtx, r.vwManagerCancel = context.WithCancel(r.ctx)
256+
}
238257

239258
vwConfig := rest.CopyConfig(r.kcpRestConfig)
240259
vwConfig.Host = vwURL
@@ -303,23 +322,47 @@ func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
303322
r.vwManager = manager
304323
}
305324

306-
// start the provider
307-
go func() {
308-
// Use the global app context so this provider is independent of the reconcile
309-
// context, which might get cancelled right after Reconcile() is done.
310-
if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil {
311-
log.Fatalw("Failed to start apiexport provider.", zap.Error(err))
312-
}
313-
}()
325+
r.providerOnce.Do(func() {
326+
log.Debug("Starting virtual workspace provider…")
327+
// start the provider
328+
go func() {
329+
// Use the global app context so this provider is independent of the reconcile
330+
// context, which might get cancelled right after Reconcile() is done.
331+
if err := r.vwProvider.Run(r.vwManagerCtx, r.vwManager); err != nil {
332+
log.Fatalw("Failed to start apiexport provider", zap.Error(err))
333+
}
334+
}()
335+
})
314336

315337
return nil
316338
}
317339

340+
type engagedCluster struct {
341+
ctx context.Context
342+
cl cluster.Cluster
343+
}
344+
318345
type controllerShim struct {
319346
reconciler *Reconciler
320347
}
321348

322349
func (s *controllerShim) Engage(ctx context.Context, clusterName string, cl cluster.Cluster) error {
350+
if _, ok := s.reconciler.clusters[clusterName]; !ok {
351+
s.reconciler.clustersLock.Lock()
352+
s.reconciler.clusters[clusterName] = engagedCluster{ctx: ctx, cl: cl}
353+
s.reconciler.clustersLock.Unlock()
354+
355+
// start a goroutine to make sure we remove the cluster when the context is done
356+
go func() {
357+
<-ctx.Done()
358+
s.reconciler.clustersLock.Lock()
359+
delete(s.reconciler.clusters, clusterName)
360+
s.reconciler.clustersLock.Unlock()
361+
}()
362+
}
363+
364+
s.reconciler.syncWorkersLock.RLock()
365+
defer s.reconciler.syncWorkersLock.RUnlock()
323366
for _, worker := range s.reconciler.syncWorkers {
324367
if err := worker.controller.Engage(ctx, clusterName, cl); err != nil {
325368
return err
@@ -348,10 +391,17 @@ func (r *Reconciler) shutdown(log *zap.SugaredLogger) {
348391
r.vwManagerCtx = nil
349392
r.vwManagerCancel = nil
350393
r.vwURL = ""
394+
r.providerOnce = sync.Once{}
395+
396+
r.clustersLock.Lock()
397+
r.clusters = make(map[string]engagedCluster)
398+
r.clustersLock.Unlock()
351399

352400
// Free all workers; since their contexts are based on the manager's context,
353401
// they have also been cancelled already above.
354-
r.syncWorkers = nil
402+
r.syncWorkersLock.Lock()
403+
r.syncWorkers = make(map[string]syncWorker)
404+
r.syncWorkersLock.Unlock()
355405
}
356406

357407
func getPublishedResourceKey(pr *syncagentv1alpha1.PublishedResource) string {
@@ -373,7 +423,10 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
373423
log.Infow("Stopping sync controller…", "key", key)
374424

375425
worker.cancel()
426+
427+
r.syncWorkersLock.Lock()
376428
delete(r.syncWorkers, key)
429+
r.syncWorkersLock.Unlock()
377430
}
378431

379432
// start missing controllers
@@ -386,13 +439,14 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
386439
continue
387440
}
388441

389-
log.Infow("Starting new sync controller…", "key", key)
390-
442+
prlog := log.With("key", key, "name", pubRes.Name)
391443
ctrlCtx, ctrlCancel := context.WithCancel(r.vwManagerCtx)
392444

445+
prlog.Info("Creating new sync controller…")
446+
393447
// create the sync controller;
394448
// use the reconciler's log without any additional reconciling context
395-
syncController, err := sync.Create(
449+
syncController, err := controllersync.Create(
396450
// This can be the reconciling context, as it's only used to find the target CRD during setup;
397451
// this context *must not* be stored in the sync controller!
398452
ctx,
@@ -410,18 +464,33 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
410464
return fmt.Errorf("failed to create sync controller: %w", err)
411465
}
412466

413-
log.Infof("storing worker at %s", key)
467+
r.syncWorkersLock.Lock()
414468
r.syncWorkers[key] = syncWorker{
415469
controller: syncController,
416470
cancel: ctrlCancel,
417471
}
472+
r.syncWorkersLock.Unlock()
418473

419-
// let 'er rip (remember to use the long-lived context here)
420-
if err := syncController.Start(ctrlCtx); err != nil {
421-
ctrlCancel()
422-
log.Info("deleting again")
474+
go func() {
475+
log.Infow("Starting sync controller…", "key", key)
476+
if err := syncController.Start(ctrlCtx); err != nil && !errors.Is(err, context.Canceled) {
477+
ctrlCancel()
478+
prlog.Errorw("failed to start sync controller", zap.Error(err))
479+
}
480+
481+
prlog.Debug("Stopped sync controller")
482+
483+
r.syncWorkersLock.Lock()
423484
delete(r.syncWorkers, key)
424-
return fmt.Errorf("failed to start sync controller: %w", err)
485+
r.syncWorkersLock.Unlock()
486+
}()
487+
488+
r.clustersLock.RLock()
489+
defer r.clustersLock.RUnlock()
490+
for name, ec := range r.clusters {
491+
if err := syncController.Engage(ec.ctx, name, ec.cl); err != nil {
492+
prlog.Errorw("failed to engage cluster", zap.Error(err), "cluster", name)
493+
}
425494
}
426495
}
427496

test/e2e/sync/primary_test.go

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestSyncSimpleObject(t *testing.T) {
6060
"test/crds/crontab.yaml",
6161
})
6262

63-
// publish Crontabs and Backups
63+
// publish Crontabs
6464
t.Logf("Publishing CRDs…")
6565
prCrontabs := &syncagentv1alpha1.PublishedResource{
6666
ObjectMeta: metav1.ObjectMeta{
@@ -839,3 +839,151 @@ spec:
839839
t.Fatalf("Expected %s annotation to be %q, but is %q.", ann, teamClusterPath.String(), value)
840840
}
841841
}
842+
843+
func TestSyncMultiResources(t *testing.T) {
844+
const (
845+
apiExportName = "kcp.example.com"
846+
kcpGroupName = "kcp.example.com"
847+
orgWorkspace = "sync-multi-resources"
848+
)
849+
850+
ctx := t.Context()
851+
ctrlruntime.SetLogger(logr.Discard())
852+
853+
// setup a test environment in kcp
854+
orgKubconfig := utils.CreateOrganization(t, ctx, orgWorkspace, apiExportName)
855+
856+
// start a service cluster
857+
envtestKubeconfig, envtestClient, _ := utils.RunEnvtest(t, []string{
858+
"test/crds/crontab.yaml",
859+
})
860+
861+
// publish Crontabs and ConfigMaps
862+
t.Logf("Publishing CRDs…")
863+
prCrontabs := &syncagentv1alpha1.PublishedResource{
864+
ObjectMeta: metav1.ObjectMeta{
865+
Name: "publish-crontabs",
866+
},
867+
Spec: syncagentv1alpha1.PublishedResourceSpec{
868+
Resource: syncagentv1alpha1.SourceResourceDescriptor{
869+
APIGroup: "example.com",
870+
Version: "v1",
871+
Kind: "CronTab",
872+
},
873+
// These rules make finding the local object easier, but should not be used in production.
874+
Naming: &syncagentv1alpha1.ResourceNaming{
875+
Name: "{{ .Object.metadata.name }}",
876+
Namespace: "synced-{{ .Object.metadata.namespace }}",
877+
},
878+
Projection: &syncagentv1alpha1.ResourceProjection{
879+
Group: kcpGroupName,
880+
},
881+
},
882+
}
883+
884+
prConfigMaps := &syncagentv1alpha1.PublishedResource{
885+
ObjectMeta: metav1.ObjectMeta{
886+
Name: "publish-configmaps",
887+
},
888+
Spec: syncagentv1alpha1.PublishedResourceSpec{
889+
Resource: syncagentv1alpha1.SourceResourceDescriptor{
890+
APIGroup: "",
891+
Version: "v1",
892+
Kind: "ConfigMap",
893+
},
894+
// These rules make finding the local object easier, but should not be used in production.
895+
Naming: &syncagentv1alpha1.ResourceNaming{
896+
Name: "{{ .Object.metadata.name }}",
897+
Namespace: "synced-{{ .Object.metadata.namespace }}",
898+
},
899+
Projection: &syncagentv1alpha1.ResourceProjection{
900+
Group: kcpGroupName,
901+
Kind: "KubeConfigMap",
902+
},
903+
},
904+
}
905+
906+
if err := envtestClient.Create(ctx, prCrontabs); err != nil {
907+
t.Fatalf("Failed to create PublishedResource for CronTabs: %v", err)
908+
}
909+
910+
if err := envtestClient.Create(ctx, prConfigMaps); err != nil {
911+
t.Fatalf("Failed to create PublishedResource for ConfigMaps: %v", err)
912+
}
913+
914+
// start the agent in the background to update the APIExport with the CronTabs API
915+
utils.RunAgent(ctx, t, "bob", orgKubconfig, envtestKubeconfig, apiExportName)
916+
917+
// wait until the API is available
918+
kcpClusterClient := utils.GetKcpAdminClusterClient(t)
919+
920+
teamClusterPath := logicalcluster.NewPath("root").Join(orgWorkspace).Join("team-1")
921+
teamClient := kcpClusterClient.Cluster(teamClusterPath)
922+
923+
utils.WaitForBoundAPI(t, ctx, teamClient, schema.GroupVersionResource{
924+
Group: kcpGroupName,
925+
Version: "v1",
926+
Resource: "crontabs",
927+
})
928+
929+
// create a Crontab object in a team workspace
930+
t.Log("Creating CronTab in kcp…")
931+
crontab := utils.YAMLToUnstructured(t, `
932+
apiVersion: kcp.example.com/v1
933+
kind: CronTab
934+
metadata:
935+
namespace: default
936+
name: my-crontab
937+
spec:
938+
cronSpec: '* * *'
939+
image: ubuntu:latest
940+
`)
941+
942+
if err := teamClient.Create(ctx, crontab); err != nil {
943+
t.Fatalf("Failed to create CronTab in kcp: %v", err)
944+
}
945+
946+
// wait for the agent to sync the object down into the service cluster
947+
948+
t.Logf("Wait for CronTab to be synced…")
949+
copy := &unstructured.Unstructured{}
950+
copy.SetAPIVersion("example.com/v1")
951+
copy.SetKind("CronTab")
952+
953+
err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) {
954+
copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-crontab"}
955+
return envtestClient.Get(ctx, copyKey, copy) == nil, nil
956+
})
957+
if err != nil {
958+
t.Fatalf("Failed to wait for CronTab object to be synced down: %v", err)
959+
}
960+
961+
// create a ConfigMap object in a team workspace
962+
t.Log("Creating KubeConfigMap in kcp…")
963+
configmap := utils.YAMLToUnstructured(t, `
964+
apiVersion: kcp.example.com/v1
965+
kind: KubeConfigMap
966+
metadata:
967+
namespace: default
968+
name: my-configmap
969+
data:
970+
dummydata: dummydata
971+
`)
972+
973+
if err := teamClient.Create(ctx, configmap); err != nil {
974+
t.Fatalf("Failed to create KubeConfigMap in kcp: %v", err)
975+
}
976+
977+
// wait for the agent to sync the object down into the service cluster
978+
979+
t.Logf("Wait for KubeConfigMap to be synced…")
980+
copyCM := &corev1.ConfigMap{}
981+
982+
err = wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, 30*time.Second, false, func(ctx context.Context) (done bool, err error) {
983+
copyKey := types.NamespacedName{Namespace: "synced-default", Name: "my-configmap"}
984+
return envtestClient.Get(ctx, copyKey, copyCM) == nil, nil
985+
})
986+
if err != nil {
987+
t.Fatalf("Failed to wait for ConfigMap object to be synced down: %v", err)
988+
}
989+
}

0 commit comments

Comments
 (0)