From fffdf3266ad7d2350df9d55b29934f3582841186 Mon Sep 17 00:00:00 2001 From: david kydd Date: Tue, 9 Dec 2025 12:29:49 +1300 Subject: [PATCH 1/9] initial bite at overlaysync controller plus inputmirror CRD --- .../config/crd/eno.azure.io_inputmirrors.yaml | 168 +++++++ .../config/crd/eno.azure.io_symphonies.yaml | 81 ++++ api/v1/inputmirror.go | 111 +++++ api/v1/symphony.go | 12 + api/v1/types.go | 1 + api/v1/zz_generated.deepcopy.go | 171 +++++++ cmd/eno-controller/main.go | 6 + .../controllers/overlaysync/controller.go | 454 ++++++++++++++++++ .../overlaysync/controller_test.go | 172 +++++++ 9 files changed, 1176 insertions(+) create mode 100644 api/v1/config/crd/eno.azure.io_inputmirrors.yaml create mode 100644 api/v1/inputmirror.go create mode 100644 internal/controllers/overlaysync/controller.go create mode 100644 internal/controllers/overlaysync/controller_test.go diff --git a/api/v1/config/crd/eno.azure.io_inputmirrors.yaml b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml new file mode 100644 index 00000000..7ec44326 --- /dev/null +++ b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml @@ -0,0 +1,168 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.18.0 + name: inputmirrors.eno.azure.io +spec: + group: eno.azure.io + names: + kind: InputMirror + listKind: InputMirrorList + plural: inputmirrors + singular: inputmirror + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.sourceResource.name + name: Source + type: string + - jsonPath: .status.conditions[?(@.type=="Synced")].status + name: Synced + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1 + schema: + openAPIV3Schema: + description: |- + InputMirror stores a copy of a resource from an overlay cluster. + It is created and managed by the OverlaySyncController based on Symphony.spec.overlayResourceRefs. + Compositions can bind to InputMirrors just like any other resource. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + type: string + metadata: + type: object + spec: + properties: + key: + description: Key matches the Symphony's overlayResourceRef key + type: string + symphonyRef: + description: SymphonyRef points to the owning Symphony + properties: + name: + description: Name of the referent. + type: string + required: + - name + type: object + sourceResource: + description: SourceResource describes what resource to sync from + the overlay + properties: + group: + description: API Group of the resource (empty string for core + API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + required: + - key + - sourceResource + - symphonyRef + type: object + status: + properties: + conditions: + description: Conditions describe the sync state + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: lastTransitionTime is the last time the condition + transitioned from one status to another. + format: date-time + type: string + message: + description: message is a human readable message indicating + details about the transition. + maxLength: 32768 + type: string + observedGeneration: + description: observedGeneration represents the .metadata.generation + that the condition was set based upon. + format: int64 + minimum: 0 + type: integer + reason: + description: reason contains a programmatic identifier indicating + the reason for the condition's last transition. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + data: + description: |- + Data contains the actual resource data from the overlay cluster. + This is the full resource serialized as JSON. + type: object + x-kubernetes-preserve-unknown-fields: true + lastSyncTime: + description: LastSyncTime records when the resource was last successfully + synced + format: date-time + type: string + syncGeneration: + description: SyncGeneration tracks the source resource's resourceVersion + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/api/v1/config/crd/eno.azure.io_symphonies.yaml b/api/v1/config/crd/eno.azure.io_symphonies.yaml index 7242466c..785aeef3 100644 --- a/api/v1/config/crd/eno.azure.io_symphonies.yaml +++ b/api/v1/config/crd/eno.azure.io_symphonies.yaml @@ -72,6 +72,87 @@ spec: - resource type: object type: array + overlayCredentials: + description: |- + OverlayCredentials specifies how to access the overlay cluster. + When set, the OverlaySyncController will use these credentials to sync + resources specified in OverlayResourceRefs. + properties: + key: + default: kubeconfig + description: Key within the secret containing the kubeconfig data + type: string + secretRef: + description: SecretRef references a Secret containing the kubeconfig + for the overlay cluster + properties: + name: + description: name is unique within a namespace to reference + a secret resource. + type: string + namespace: + description: namespace defines the space within which the + secret name must be unique. + type: string + type: object + required: + - secretRef + type: object + overlayResourceRefs: + description: |- + OverlayResourceRefs specifies resources to sync from the overlay cluster. + Each ref results in an InputMirror being created that can be bound as an input. + items: + description: OverlayResourceRef defines a resource to sync from + an overlay cluster + properties: + key: + description: |- + Key that will be used to reference this input in Composition bindings. + This key maps to an auto-created InputMirror resource. + type: string + optional: + default: false + description: Optional indicates that synthesis can proceed if + this resource doesn't exist in the overlay. + type: boolean + resource: + description: Resource specifies what to fetch from the overlay + cluster + properties: + group: + description: API Group of the resource (empty string for + core API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + syncInterval: + default: 5m + description: SyncInterval determines how often to re-sync the + resource. + type: string + required: + - key + - resource + type: object + maxItems: 20 + type: array synthesisEnv: description: |- SynthesisEnv diff --git a/api/v1/inputmirror.go b/api/v1/inputmirror.go new file mode 100644 index 00000000..a91de7af --- /dev/null +++ b/api/v1/inputmirror.go @@ -0,0 +1,111 @@ +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// +kubebuilder:object:root=true +type InputMirrorList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []InputMirror `json:"items"` +} + +// InputMirror stores a copy of a resource from an overlay cluster. +// It is created and managed by the OverlaySyncController based on Symphony.spec.overlayResourceRefs. +// Compositions can bind to InputMirrors just like any other resource. +// +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Source",type=string,JSONPath=`.spec.sourceResource.name` +// +kubebuilder:printcolumn:name="Synced",type=string,JSONPath=`.status.conditions[?(@.type=="Synced")].status` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` +type InputMirror struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec InputMirrorSpec `json:"spec,omitempty"` + Status InputMirrorStatus `json:"status,omitempty"` +} + +type InputMirrorSpec struct { + // Key matches the Symphony's overlayResourceRef key + Key string `json:"key"` + + // SymphonyRef points to the owning Symphony + SymphonyRef corev1.LocalObjectReference `json:"symphonyRef"` + + // SourceResource describes what resource to sync from the overlay + SourceResource OverlayResourceSelector `json:"sourceResource"` +} + +type InputMirrorStatus struct { + // Data contains the actual resource data from the overlay cluster. + // This is the full resource serialized as JSON. + // +kubebuilder:pruning:PreserveUnknownFields + Data *runtime.RawExtension `json:"data,omitempty"` + + // LastSyncTime records when the resource was last successfully synced + LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"` + + // SyncGeneration tracks the source resource's resourceVersion + SyncGeneration string `json:"syncGeneration,omitempty"` + + // Conditions describe the sync state + // +listType=map + // +listMapKey=type + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// OverlayResourceSelector describes a resource to sync from an overlay cluster +type OverlayResourceSelector struct { + // API Group of the resource (empty string for core API group) + // +optional + Group string `json:"group,omitempty"` + + // API Version of the resource + Version string `json:"version"` + + // Kind of the resource (e.g., ConfigMap, Secret) + Kind string `json:"kind"` + + // Name of the resource + Name string `json:"name"` + + // Namespace of the resource (empty for cluster-scoped resources) + // +optional + Namespace string `json:"namespace,omitempty"` +} + +// OverlayCredentials specifies how to access an overlay cluster +type OverlayCredentials struct { + // SecretRef references a Secret containing the kubeconfig for the overlay cluster + SecretRef corev1.SecretReference `json:"secretRef"` + + // Key within the secret containing the kubeconfig data + // +kubebuilder:default="kubeconfig" + // +optional + Key string `json:"key,omitempty"` +} + +// OverlayResourceRef defines a resource to sync from an overlay cluster +type OverlayResourceRef struct { + // Key that will be used to reference this input in Composition bindings. + // This key maps to an auto-created InputMirror resource. + Key string `json:"key"` + + // Resource specifies what to fetch from the overlay cluster + Resource OverlayResourceSelector `json:"resource"` + + // SyncInterval determines how often to re-sync the resource. + // +kubebuilder:default="5m" + // +optional + SyncInterval *metav1.Duration `json:"syncInterval,omitempty"` + + // Optional indicates that synthesis can proceed if this resource doesn't exist in the overlay. + // +kubebuilder:default=false + // +optional + Optional bool `json:"optional,omitempty"` +} diff --git a/api/v1/symphony.go b/api/v1/symphony.go index 83cc27ef..94c3ccd3 100644 --- a/api/v1/symphony.go +++ b/api/v1/symphony.go @@ -39,6 +39,18 @@ type SymphonySpec struct { // Copied opaquely into the compositions managed by this symphony. // +kubebuilder:validation:MaxItems:=50 SynthesisEnv []EnvVar `json:"synthesisEnv,omitempty"` // deprecated synthesis env should always be variation scoped. + + // OverlayCredentials specifies how to access the overlay cluster. + // When set, the OverlaySyncController will use these credentials to sync + // resources specified in OverlayResourceRefs. + // +optional + OverlayCredentials *OverlayCredentials `json:"overlayCredentials,omitempty"` + + // OverlayResourceRefs specifies resources to sync from the overlay cluster. + // Each ref results in an InputMirror being created that can be bound as an input. + // +optional + // +kubebuilder:validation:MaxItems:=20 + OverlayResourceRefs []OverlayResourceRef `json:"overlayResourceRefs,omitempty"` } type SymphonyStatus struct { diff --git a/api/v1/types.go b/api/v1/types.go index e5fdb085..20e69935 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -19,4 +19,5 @@ func init() { SchemeBuilder.Register(&CompositionList{}, &Composition{}) SchemeBuilder.Register(&SymphonyList{}, &Symphony{}) SchemeBuilder.Register(&ResourceSliceList{}, &ResourceSlice{}) + SchemeBuilder.Register(&InputMirrorList{}, &InputMirror{}) } diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 5a8d0e02..ab362c98 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -582,6 +582,18 @@ func (in *SymphonySpec) DeepCopyInto(out *SymphonySpec) { *out = make([]EnvVar, len(*in)) copy(*out, *in) } + if in.OverlayCredentials != nil { + in, out := &in.OverlayCredentials, &out.OverlayCredentials + *out = new(OverlayCredentials) + **out = **in + } + if in.OverlayResourceRefs != nil { + in, out := &in.OverlayResourceRefs, &out.OverlayResourceRefs + *out = make([]OverlayResourceRef, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SymphonySpec. @@ -845,3 +857,162 @@ func (in *Variation) DeepCopy() *Variation { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirror) DeepCopyInto(out *InputMirror) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirror. +func (in *InputMirror) DeepCopy() *InputMirror { + if in == nil { + return nil + } + out := new(InputMirror) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirror) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorList) DeepCopyInto(out *InputMirrorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]InputMirror, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorList. +func (in *InputMirrorList) DeepCopy() *InputMirrorList { + if in == nil { + return nil + } + out := new(InputMirrorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirrorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorSpec) DeepCopyInto(out *InputMirrorSpec) { + *out = *in + out.SymphonyRef = in.SymphonyRef + out.SourceResource = in.SourceResource +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorSpec. +func (in *InputMirrorSpec) DeepCopy() *InputMirrorSpec { + if in == nil { + return nil + } + out := new(InputMirrorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorStatus) DeepCopyInto(out *InputMirrorStatus) { + *out = *in + if in.Data != nil { + in, out := &in.Data, &out.Data + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.LastSyncTime != nil { + in, out := &in.LastSyncTime, &out.LastSyncTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorStatus. +func (in *InputMirrorStatus) DeepCopy() *InputMirrorStatus { + if in == nil { + return nil + } + out := new(InputMirrorStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OverlayResourceSelector) DeepCopyInto(out *OverlayResourceSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceSelector. +func (in *OverlayResourceSelector) DeepCopy() *OverlayResourceSelector { + if in == nil { + return nil + } + out := new(OverlayResourceSelector) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OverlayCredentials) DeepCopyInto(out *OverlayCredentials) { + *out = *in + out.SecretRef = in.SecretRef +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayCredentials. +func (in *OverlayCredentials) DeepCopy() *OverlayCredentials { + if in == nil { + return nil + } + out := new(OverlayCredentials) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OverlayResourceRef) DeepCopyInto(out *OverlayResourceRef) { + *out = *in + out.Resource = in.Resource + if in.SyncInterval != nil { + in, out := &in.SyncInterval, &out.SyncInterval + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceRef. +func (in *OverlayResourceRef) DeepCopy() *OverlayResourceRef { + if in == nil { + return nil + } + out := new(OverlayResourceRef) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/eno-controller/main.go b/cmd/eno-controller/main.go index f0135384..7e532b52 100644 --- a/cmd/eno-controller/main.go +++ b/cmd/eno-controller/main.go @@ -19,6 +19,7 @@ import ( v1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/controllers/composition" + "github.com/Azure/eno/internal/controllers/overlaysync" "github.com/Azure/eno/internal/controllers/resourceslice" "github.com/Azure/eno/internal/controllers/scheduling" "github.com/Azure/eno/internal/controllers/symphony" @@ -170,6 +171,11 @@ func runController() error { return fmt.Errorf("constructing symphony controller: %w", err) } + err = overlaysync.NewController(mgr) + if err != nil { + return fmt.Errorf("constructing overlay sync controller: %w", err) + } + return mgr.Start(ctx) } diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/overlaysync/controller.go new file mode 100644 index 00000000..6a0ad91e --- /dev/null +++ b/internal/controllers/overlaysync/controller.go @@ -0,0 +1,454 @@ +// Package overlaysync implements the OverlaySyncController which syncs resources +// from overlay clusters to the underlay as InputMirror resources. +// +// SECURITY CONSIDERATIONS: +// - Overlay credentials are stored in Secrets and never logged +// - Secret access is restricted to the Symphony's namespace by default +// - REST client has timeouts to prevent resource exhaustion +// - Cached clients are invalidated on credential rotation +// - Only specified resource types can be synced (no arbitrary access) +package overlaysync + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "sync" + "time" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/clientcmd" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +const ( + // ConditionTypeSynced indicates whether the InputMirror has been successfully synced + ConditionTypeSynced = "Synced" + + // DefaultSyncInterval is the default interval for re-syncing overlay resources + DefaultSyncInterval = 5 * time.Minute + + // FinalizerName is the finalizer added to InputMirrors + FinalizerName = "eno.azure.io/overlay-sync" + + // Client timeout settings for security + overlayClientTimeout = 30 * time.Second + overlayClientQPS = 5 + overlayClientBurst = 10 +) + +// AllowedSyncKinds defines which resource kinds can be synced from overlay. +// This is a security control to prevent syncing sensitive resources. +var AllowedSyncKinds = map[schema.GroupKind]bool{ + {Group: "", Kind: "ConfigMap"}: true, + // Add other allowed kinds here as needed + // Explicitly NOT allowing: Secret, ServiceAccount, etc. +} + +// overlayClient holds a cached client for an overlay cluster +type overlayClient struct { + client client.Client + createdAt time.Time + credentialHash string // Hash of credentials to detect rotation +} + +// Controller reconciles Symphonies with overlay resource refs, syncing resources +// from overlay clusters to InputMirror resources on the underlay. +type Controller struct { + client client.Client + scheme *runtime.Scheme + + // overlayClients caches overlay cluster clients keyed by symphony namespace/name + overlayClients sync.Map + + // clientCacheTTL determines how long overlay clients are cached + clientCacheTTL time.Duration + + // allowedKinds can be overridden for testing + allowedKinds map[schema.GroupKind]bool +} + +// NewController creates a new OverlaySyncController and registers it with the manager. +func NewController(mgr ctrl.Manager) error { + c := &Controller{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + clientCacheTTL: 10 * time.Minute, + allowedKinds: AllowedSyncKinds, + } + + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Symphony{}). + Owns(&apiv1.InputMirror{}). + WithLogConstructor(manager.NewLogConstructor(mgr, "overlaySyncController")). + Complete(c) +} + +func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + symphony := &apiv1.Symphony{} + if err := c.client.Get(ctx, req.NamespacedName, symphony); err != nil { + if errors.IsNotFound(err) { + // Symphony deleted, overlay clients will be cleaned up by GC + c.overlayClients.Delete(req.String()) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + logger = logger.WithValues( + "symphonyName", symphony.Name, + "symphonyNamespace", symphony.Namespace, + ) + ctx = logr.NewContext(ctx, logger) + + // Skip if no overlay resource refs defined + if len(symphony.Spec.OverlayResourceRefs) == 0 { + return ctrl.Result{}, nil + } + + // Skip if no overlay credentials provided + if symphony.Spec.OverlayCredentials == nil { + logger.V(1).Info("symphony has overlay resource refs but no credentials, skipping") + return ctrl.Result{}, nil + } + + // Handle deletion + if symphony.DeletionTimestamp != nil { + // InputMirrors will be garbage collected via owner references + c.overlayClients.Delete(req.String()) + return ctrl.Result{}, nil + } + + // Get or create overlay client + overlayClient, err := c.getOrCreateOverlayClient(ctx, symphony) + if err != nil { + logger.Error(err, "failed to create overlay client") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + // Sync each overlay resource ref + var minRequeue time.Duration + for _, ref := range symphony.Spec.OverlayResourceRefs { + requeue, err := c.syncOverlayResource(ctx, symphony, overlayClient, ref) + if err != nil { + logger.Error(err, "failed to sync overlay resource", "key", ref.Key) + // Continue with other refs + } + if requeue > 0 && (minRequeue == 0 || requeue < minRequeue) { + minRequeue = requeue + } + } + + // Clean up InputMirrors for refs that no longer exist + if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { + logger.Error(err, "failed to cleanup orphaned mirrors") + } + + if minRequeue > 0 { + return ctrl.Result{RequeueAfter: minRequeue}, nil + } + return ctrl.Result{RequeueAfter: DefaultSyncInterval}, nil +} + +// hashCredentials creates a SHA256 hash of credential data for change detection. +// This allows detecting credential rotation without storing the credentials. +func hashCredentials(data []byte) string { + h := sha256.Sum256(data) + return hex.EncodeToString(h[:]) +} + +// getOrCreateOverlayClient gets a cached overlay client or creates a new one. +// Security: Credentials are never logged, client has timeouts, cache invalidates on rotation. +func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *apiv1.Symphony) (client.Client, error) { + logger := logr.FromContextOrDiscard(ctx) + key := fmt.Sprintf("%s/%s", symphony.Namespace, symphony.Name) + + // Get the kubeconfig secret first to check for credential rotation + creds := symphony.Spec.OverlayCredentials + secret := &corev1.Secret{} + secretKey := types.NamespacedName{ + Name: creds.SecretRef.Name, + Namespace: creds.SecretRef.Namespace, + } + + // SECURITY: Only allow accessing secrets in the Symphony's namespace + // This prevents cross-namespace credential access + if secretKey.Namespace == "" { + secretKey.Namespace = symphony.Namespace + } + if secretKey.Namespace != symphony.Namespace { + return nil, fmt.Errorf("security: credential secret must be in symphony namespace %q, got %q", + symphony.Namespace, secretKey.Namespace) + } + + if err := c.client.Get(ctx, secretKey, secret); err != nil { + return nil, fmt.Errorf("getting overlay credentials secret: %w", err) + } + + // Get kubeconfig data - NEVER log this + kubeconfigKey := creds.Key + if kubeconfigKey == "" { + kubeconfigKey = "kubeconfig" + } + kubeconfigData, ok := secret.Data[kubeconfigKey] + if !ok { + return nil, fmt.Errorf("kubeconfig key %q not found in secret", kubeconfigKey) + } + + // Hash credentials to detect rotation without storing them + credHash := hashCredentials(kubeconfigData) + + // Check cache - invalidate if credentials changed or TTL expired + if cached, ok := c.overlayClients.Load(key); ok { + oc := cached.(*overlayClient) + if time.Since(oc.createdAt) < c.clientCacheTTL && oc.credentialHash == credHash { + return oc.client, nil + } + // Cache expired or credentials rotated + logger.V(1).Info("invalidating cached overlay client", + "reason", map[bool]string{true: "credential_rotation", false: "ttl_expired"}[oc.credentialHash != credHash]) + } + + // Create REST config from kubeconfig + restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigData) + if err != nil { + return nil, fmt.Errorf("parsing kubeconfig: %w", err) + } + + // SECURITY: Apply rate limiting and timeouts to prevent resource exhaustion + restConfig.Timeout = overlayClientTimeout + restConfig.QPS = overlayClientQPS + restConfig.Burst = overlayClientBurst + + // Set a meaningful user agent for audit logs on the overlay + restConfig.UserAgent = "eno-overlay-sync-controller" + + // Create client + oc, err := client.New(restConfig, client.Options{}) + if err != nil { + return nil, fmt.Errorf("creating overlay client: %w", err) + } + + // Cache the client with credential hash for rotation detection + c.overlayClients.Store(key, &overlayClient{ + client: oc, + createdAt: time.Now(), + credentialHash: credHash, + }) + + // SECURITY: Don't log secret name in production, only log that client was created + logger.V(1).Info("created overlay client") + return oc, nil +} + +// syncOverlayResource syncs a single overlay resource to an InputMirror +func (c *Controller) syncOverlayResource( + ctx context.Context, + symphony *apiv1.Symphony, + overlayClient client.Client, + ref apiv1.OverlayResourceRef, +) (time.Duration, error) { + logger := logr.FromContextOrDiscard(ctx).WithValues("key", ref.Key, "resourceName", ref.Resource.Name) + + // SECURITY: Validate the resource kind is allowed to be synced + gk := schema.GroupKind{Group: ref.Resource.Group, Kind: ref.Resource.Kind} + if !c.allowedKinds[gk] { + return 0, fmt.Errorf("security: resource kind %q is not allowed to be synced from overlay", gk.String()) + } + + // Fetch from overlay + obj := &unstructured.Unstructured{} + obj.SetGroupVersionKind(schema.GroupVersionKind{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Kind: ref.Resource.Kind, + }) + + objKey := types.NamespacedName{ + Name: ref.Resource.Name, + Namespace: ref.Resource.Namespace, + } + + err := overlayClient.Get(ctx, objKey, obj) + if err != nil { + if errors.IsNotFound(err) && ref.Optional { + logger.V(1).Info("optional overlay resource not found, skipping") + // Update InputMirror to reflect missing state + return c.updateMirrorMissing(ctx, symphony, ref) + } + return 0, fmt.Errorf("getting overlay resource: %w", err) + } + + // Create/Update InputMirror + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: mirrorName, + Namespace: symphony.Namespace, + }, + } + + result, err := controllerutil.CreateOrUpdate(ctx, c.client, mirror, func() error { + // Set owner reference + if err := controllerutil.SetControllerReference(symphony, mirror, c.scheme); err != nil { + return err + } + + // Update spec + mirror.Spec.Key = ref.Key + mirror.Spec.SymphonyRef = corev1.LocalObjectReference{Name: symphony.Name} + mirror.Spec.SourceResource = ref.Resource + + // Serialize the resource data + rawData, err := json.Marshal(obj.Object) + if err != nil { + return fmt.Errorf("marshaling resource data: %w", err) + } + mirror.Status.Data = &runtime.RawExtension{Raw: rawData} + mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} + mirror.Status.SyncGeneration = obj.GetResourceVersion() + + // Update conditions + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") + + return nil + }) + + if err != nil { + return 0, fmt.Errorf("creating/updating InputMirror: %w", err) + } + + logger.V(1).Info("synced overlay resource", "result", result, "mirrorName", mirrorName) + + // Determine requeue interval + syncInterval := DefaultSyncInterval + if ref.SyncInterval != nil { + syncInterval = ref.SyncInterval.Duration + } + return syncInterval, nil +} + +// updateMirrorMissing updates the InputMirror to reflect that the source resource is missing +func (c *Controller) updateMirrorMissing( + ctx context.Context, + symphony *apiv1.Symphony, + ref apiv1.OverlayResourceRef, +) (time.Duration, error) { + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{} + err := c.client.Get(ctx, types.NamespacedName{Name: mirrorName, Namespace: symphony.Namespace}, mirror) + if errors.IsNotFound(err) { + // No mirror exists, nothing to update + return DefaultSyncInterval, nil + } + if err != nil { + return 0, err + } + + // Update condition to reflect missing state + setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in overlay") + mirror.Status.Data = nil + + if err := c.client.Status().Update(ctx, mirror); err != nil { + return 0, err + } + + syncInterval := DefaultSyncInterval + if ref.SyncInterval != nil { + syncInterval = ref.SyncInterval.Duration + } + return syncInterval, nil +} + +// cleanupOrphanedMirrors removes InputMirrors for refs that no longer exist in the Symphony +func (c *Controller) cleanupOrphanedMirrors(ctx context.Context, symphony *apiv1.Symphony) error { + logger := logr.FromContextOrDiscard(ctx) + + // List all InputMirrors owned by this Symphony + mirrors := &apiv1.InputMirrorList{} + if err := c.client.List(ctx, mirrors, + client.InNamespace(symphony.Namespace), + client.MatchingFields{"spec.symphonyRef.name": symphony.Name}, + ); err != nil { + // If the index isn't set up, fall back to filtering manually + if err := c.client.List(ctx, mirrors, client.InNamespace(symphony.Namespace)); err != nil { + return err + } + } + + // Build set of expected mirror names + expected := make(map[string]struct{}) + for _, ref := range symphony.Spec.OverlayResourceRefs { + expected[inputMirrorName(symphony.Name, ref.Key)] = struct{}{} + } + + // Delete orphaned mirrors + for _, mirror := range mirrors.Items { + // Check if owned by this symphony + if mirror.Spec.SymphonyRef.Name != symphony.Name { + continue + } + if _, ok := expected[mirror.Name]; !ok { + logger.V(1).Info("deleting orphaned InputMirror", "mirrorName", mirror.Name) + if err := c.client.Delete(ctx, &mirror); err != nil && !errors.IsNotFound(err) { + return err + } + } + } + + return nil +} + +// inputMirrorName generates the name for an InputMirror +func inputMirrorName(symphonyName, key string) string { + return fmt.Sprintf("%s-%s", symphonyName, key) +} + +// setSyncedCondition updates the Synced condition on an InputMirror +func setSyncedCondition(mirror *apiv1.InputMirror, synced bool, reason, message string) { + status := metav1.ConditionFalse + if synced { + status = metav1.ConditionTrue + } + + now := metav1.Now() + condition := metav1.Condition{ + Type: ConditionTypeSynced, + Status: status, + ObservedGeneration: mirror.Generation, + LastTransitionTime: now, + Reason: reason, + Message: message, + } + + // Find and update existing condition or append + for i, c := range mirror.Status.Conditions { + if c.Type == ConditionTypeSynced { + if c.Status != condition.Status { + mirror.Status.Conditions[i] = condition + } else { + // Only update reason/message, keep transition time + mirror.Status.Conditions[i].Reason = reason + mirror.Status.Conditions[i].Message = message + mirror.Status.Conditions[i].ObservedGeneration = mirror.Generation + } + return + } + } + mirror.Status.Conditions = append(mirror.Status.Conditions, condition) +} diff --git a/internal/controllers/overlaysync/controller_test.go b/internal/controllers/overlaysync/controller_test.go new file mode 100644 index 00000000..ba4b63e3 --- /dev/null +++ b/internal/controllers/overlaysync/controller_test.go @@ -0,0 +1,172 @@ +package overlaysync + +import ( + "context" + "testing" + "time" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func TestInputMirrorName(t *testing.T) { + tests := []struct { + symphonyName string + key string + expected string + }{ + {"symphony-123", "metricsSettings", "symphony-123-metricsSettings"}, + {"my-symphony", "config", "my-symphony-config"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + result := inputMirrorName(tt.symphonyName, tt.key) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestSetSyncedCondition(t *testing.T) { + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mirror", + Namespace: "test-ns", + Generation: 1, + }, + } + + // Test setting synced=true + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, ConditionTypeSynced, mirror.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncSuccess", mirror.Status.Conditions[0].Reason) + assert.Equal(t, "Successfully synced", mirror.Status.Conditions[0].Message) + + // Test updating to synced=false + setSyncedCondition(mirror, false, "SyncFailed", "Failed to sync") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, metav1.ConditionFalse, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncFailed", mirror.Status.Conditions[0].Reason) +} + +func TestReconcile_NoOverlayRefs(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + // No OverlayResourceRefs + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_NoCredentials(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + OverlayResourceRefs: []apiv1.OverlayResourceRef{ + { + Key: "test", + Resource: apiv1.OverlayResourceSelector{ + Kind: "ConfigMap", + Version: "v1", + Name: "test-cm", + }, + }, + }, + // No OverlayCredentials + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + // Should return empty result since no credentials + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_SymphonyNotFound(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + client := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + clientCacheTTL: 10 * time.Minute, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "non-existent", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +} From 9a1a0e9663ffa8748977b6745a300ea426027d8b Mon Sep 17 00:00:00 2001 From: david kydd Date: Mon, 15 Dec 2025 13:45:13 +1300 Subject: [PATCH 2/9] fix: update InputMirror status via Status().Update() subresource CreateOrUpdate only persists spec changes, not status changes. Status must be updated separately via the status subresource. --- .../controllers/overlaysync/controller.go | 28 +++++++++++-------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/overlaysync/controller.go index 6a0ad91e..39d67604 100644 --- a/internal/controllers/overlaysync/controller.go +++ b/internal/controllers/overlaysync/controller.go @@ -314,18 +314,6 @@ func (c *Controller) syncOverlayResource( mirror.Spec.SymphonyRef = corev1.LocalObjectReference{Name: symphony.Name} mirror.Spec.SourceResource = ref.Resource - // Serialize the resource data - rawData, err := json.Marshal(obj.Object) - if err != nil { - return fmt.Errorf("marshaling resource data: %w", err) - } - mirror.Status.Data = &runtime.RawExtension{Raw: rawData} - mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} - mirror.Status.SyncGeneration = obj.GetResourceVersion() - - // Update conditions - setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") - return nil }) @@ -333,6 +321,22 @@ func (c *Controller) syncOverlayResource( return 0, fmt.Errorf("creating/updating InputMirror: %w", err) } + // Update status separately - CreateOrUpdate only updates spec, not status subresource + rawData, err := json.Marshal(obj.Object) + if err != nil { + return 0, fmt.Errorf("marshaling resource data: %w", err) + } + mirror.Status.Data = &runtime.RawExtension{Raw: rawData} + mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} + mirror.Status.SyncGeneration = obj.GetResourceVersion() + + // Update conditions + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") + + if err := c.client.Status().Update(ctx, mirror); err != nil { + return 0, fmt.Errorf("updating InputMirror status: %w", err) + } + logger.V(1).Info("synced overlay resource", "result", result, "mirrorName", mirrorName) // Determine requeue interval From 1352e1080686a3984a457e77670b5e0e62b4f7fe Mon Sep 17 00:00:00 2001 From: david kydd Date: Wed, 17 Dec 2025 14:17:20 +1300 Subject: [PATCH 3/9] parallelize controller syncing --- .../controllers/overlaysync/controller.go | 102 +++++++++++++++--- 1 file changed, 90 insertions(+), 12 deletions(-) diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/overlaysync/controller.go index 39d67604..48b90a55 100644 --- a/internal/controllers/overlaysync/controller.go +++ b/internal/controllers/overlaysync/controller.go @@ -21,6 +21,7 @@ import ( apiv1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/manager" "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,6 +49,12 @@ const ( overlayClientTimeout = 30 * time.Second overlayClientQPS = 5 overlayClientBurst = 10 + + // maxSyncConcurrency limits parallel overlay resource fetches per Symphony. + // This prevents overwhelming the overlay cluster's API server while still + // providing significant speedup over sequential syncing. + // With 100 refs at ~50ms each: sequential = 5s, parallel (10) = 500ms + maxSyncConcurrency = 10 ) // AllowedSyncKinds defines which resource kinds can be synced from overlay. @@ -141,18 +148,8 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{RequeueAfter: time.Minute}, nil } - // Sync each overlay resource ref - var minRequeue time.Duration - for _, ref := range symphony.Spec.OverlayResourceRefs { - requeue, err := c.syncOverlayResource(ctx, symphony, overlayClient, ref) - if err != nil { - logger.Error(err, "failed to sync overlay resource", "key", ref.Key) - // Continue with other refs - } - if requeue > 0 && (minRequeue == 0 || requeue < minRequeue) { - minRequeue = requeue - } - } + // Sync all overlay resource refs in parallel with bounded concurrency + minRequeue := c.syncOverlayResourcesParallel(ctx, symphony, overlayClient) // Clean up InputMirrors for refs that no longer exist if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { @@ -256,6 +253,87 @@ func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *api return oc, nil } +// syncResult holds the result of syncing a single overlay resource +type syncResult struct { + key string + requeue time.Duration + err error +} + +// syncOverlayResourcesParallel syncs all overlay resource refs in parallel with bounded concurrency. +// This reduces reconcile latency from O(n * latency) to O(n/concurrency * latency). +// For example, with 100 refs at ~50ms each: sequential = 5s, parallel (10) = ~500ms. +func (c *Controller) syncOverlayResourcesParallel( + ctx context.Context, + symphony *apiv1.Symphony, + overlayClient client.Client, +) time.Duration { + logger := logr.FromContextOrDiscard(ctx) + refs := symphony.Spec.OverlayResourceRefs + + if len(refs) == 0 { + return DefaultSyncInterval + } + + // Use a semaphore to limit concurrent overlay API calls + sem := make(chan struct{}, maxSyncConcurrency) + results := make(chan syncResult, len(refs)) + + // Use errgroup for structured concurrency, but we don't fail fast on errors + // since we want to sync as many refs as possible + g, ctx := errgroup.WithContext(ctx) + + for _, ref := range refs { + ref := ref // capture loop variable + g.Go(func() error { + // Acquire semaphore + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-ctx.Done(): + results <- syncResult{key: ref.Key, err: ctx.Err()} + return nil + } + + requeue, err := c.syncOverlayResource(ctx, symphony, overlayClient, ref) + results <- syncResult{key: ref.Key, requeue: requeue, err: err} + return nil // Don't propagate errors - we handle them individually + }) + } + + // Wait for all goroutines to complete + _ = g.Wait() + close(results) + + // Process results and determine minimum requeue time + var minRequeue time.Duration + var successCount, failCount int + + for result := range results { + if result.err != nil { + logger.Error(result.err, "failed to sync overlay resource", "key", result.key) + failCount++ + continue + } + successCount++ + if result.requeue > 0 && (minRequeue == 0 || result.requeue < minRequeue) { + minRequeue = result.requeue + } + } + + logger.V(1).Info("completed parallel overlay sync", + "total", len(refs), + "success", successCount, + "failed", failCount, + "minRequeue", minRequeue, + ) + + if minRequeue == 0 { + return DefaultSyncInterval + } + return minRequeue +} + // syncOverlayResource syncs a single overlay resource to an InputMirror func (c *Controller) syncOverlayResource( ctx context.Context, From 719ccdc88b8eca378115563792d9b704b663ea79 Mon Sep 17 00:00:00 2001 From: david kydd Date: Thu, 18 Dec 2025 09:49:48 +1300 Subject: [PATCH 4/9] migrate to use watches / informers rather than rely on polling in controller --- .../controllers/overlaysync/controller.go | 376 +++++++++++++----- .../overlaysync/controller_test.go | 26 +- 2 files changed, 283 insertions(+), 119 deletions(-) diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/overlaysync/controller.go index 48b90a55..a8af90a3 100644 --- a/internal/controllers/overlaysync/controller.go +++ b/internal/controllers/overlaysync/controller.go @@ -1,11 +1,18 @@ // Package overlaysync implements the OverlaySyncController which syncs resources // from overlay clusters to the underlay as InputMirror resources. // +// ARCHITECTURE: +// This controller uses a watch-based approach for real-time sync instead of polling: +// 1. Each Symphony with overlay refs gets a dedicated "overlay watcher" +// 2. The watcher sets up dynamic informers on the overlay cluster for each ref +// 3. When a watched resource changes, the informer triggers a reconcile +// 4. The reconciler syncs the changed resource to the corresponding InputMirror +// // SECURITY CONSIDERATIONS: // - Overlay credentials are stored in Secrets and never logged // - Secret access is restricted to the Symphony's namespace by default // - REST client has timeouts to prevent resource exhaustion -// - Cached clients are invalidated on credential rotation +// - Cached watchers are invalidated on credential rotation // - Only specified resource types can be synced (no arbitrary access) package overlaysync @@ -29,7 +36,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -39,8 +50,12 @@ const ( // ConditionTypeSynced indicates whether the InputMirror has been successfully synced ConditionTypeSynced = "Synced" - // DefaultSyncInterval is the default interval for re-syncing overlay resources - DefaultSyncInterval = 5 * time.Minute + // FallbackSyncInterval is used when watches fail or as a safety net for missed events. + // Watches provide real-time updates, so this is only a fallback. + FallbackSyncInterval = 30 * time.Minute + + // WatchResyncPeriod is how often the informer re-lists all objects as a consistency check + WatchResyncPeriod = 10 * time.Minute // FinalizerName is the finalizer added to InputMirrors FinalizerName = "eno.azure.io/overlay-sync" @@ -65,11 +80,25 @@ var AllowedSyncKinds = map[schema.GroupKind]bool{ // Explicitly NOT allowing: Secret, ServiceAccount, etc. } -// overlayClient holds a cached client for an overlay cluster -type overlayClient struct { - client client.Client - createdAt time.Time +// overlayWatcher manages watch connections to a single overlay cluster. +// It maintains dynamic informers for each resource type being watched. +type overlayWatcher struct { + mu sync.RWMutex + + symphonyKey string // namespace/name of the symphony credentialHash string // Hash of credentials to detect rotation + createdAt time.Time + + // Dynamic client and informer factory for the overlay cluster + dynamicClient dynamic.Interface + informerFactory dynamicinformer.DynamicSharedInformerFactory + stopCh chan struct{} + + // Track which GVRs we're watching + watchedGVRs map[schema.GroupVersionResource]struct{} + + // Reference to the controller for enqueuing reconciles + controller *Controller } // Controller reconciles Symphonies with overlay resource refs, syncing resources @@ -78,23 +107,29 @@ type Controller struct { client client.Client scheme *runtime.Scheme - // overlayClients caches overlay cluster clients keyed by symphony namespace/name - overlayClients sync.Map + // overlayWatchers manages watch connections keyed by symphony namespace/name + overlayWatchers sync.Map - // clientCacheTTL determines how long overlay clients are cached - clientCacheTTL time.Duration + // watcherCacheTTL determines how long overlay watchers are cached before refresh + watcherCacheTTL time.Duration // allowedKinds can be overridden for testing allowedKinds map[schema.GroupKind]bool + + // reconcileQueue is used by informer callbacks to enqueue Symphony reconciles + reconcileQueue workqueue.TypedRateLimitingInterface[ctrl.Request] } // NewController creates a new OverlaySyncController and registers it with the manager. func NewController(mgr ctrl.Manager) error { c := &Controller{ - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - clientCacheTTL: 10 * time.Minute, - allowedKinds: AllowedSyncKinds, + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + watcherCacheTTL: 30 * time.Minute, + allowedKinds: AllowedSyncKinds, + reconcileQueue: workqueue.NewTypedRateLimitingQueue( + workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](), + ), } return ctrl.NewControllerManagedBy(mgr). @@ -110,8 +145,8 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu symphony := &apiv1.Symphony{} if err := c.client.Get(ctx, req.NamespacedName, symphony); err != nil { if errors.IsNotFound(err) { - // Symphony deleted, overlay clients will be cleaned up by GC - c.overlayClients.Delete(req.String()) + // Symphony deleted, cleanup watcher + c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -125,41 +160,56 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Skip if no overlay resource refs defined if len(symphony.Spec.OverlayResourceRefs) == 0 { + c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } // Skip if no overlay credentials provided if symphony.Spec.OverlayCredentials == nil { logger.V(1).Info("symphony has overlay resource refs but no credentials, skipping") + c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } // Handle deletion if symphony.DeletionTimestamp != nil { // InputMirrors will be garbage collected via owner references - c.overlayClients.Delete(req.String()) + c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } - // Get or create overlay client - overlayClient, err := c.getOrCreateOverlayClient(ctx, symphony) + // Get or create overlay watcher with watches + watcher, err := c.getOrCreateOverlayWatcher(ctx, symphony) if err != nil { - logger.Error(err, "failed to create overlay client") + logger.Error(err, "failed to create overlay watcher") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + // Ensure informers are set up for all resource refs + if err := watcher.ensureInformers(ctx, symphony); err != nil { + logger.Error(err, "failed to setup informers") return ctrl.Result{RequeueAfter: time.Minute}, nil } // Sync all overlay resource refs in parallel with bounded concurrency - minRequeue := c.syncOverlayResourcesParallel(ctx, symphony, overlayClient) + // This handles the initial sync and any changes detected by watches + c.syncOverlayResourcesParallel(ctx, symphony, watcher) // Clean up InputMirrors for refs that no longer exist if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { logger.Error(err, "failed to cleanup orphaned mirrors") } - if minRequeue > 0 { - return ctrl.Result{RequeueAfter: minRequeue}, nil + // With watches, we only need periodic reconciles as a fallback safety net + return ctrl.Result{RequeueAfter: FallbackSyncInterval}, nil +} + +// cleanupWatcher stops and removes the overlay watcher for a symphony +func (c *Controller) cleanupWatcher(key string) { + if val, ok := c.overlayWatchers.LoadAndDelete(key); ok { + watcher := val.(*overlayWatcher) + close(watcher.stopCh) } - return ctrl.Result{RequeueAfter: DefaultSyncInterval}, nil } // hashCredentials creates a SHA256 hash of credential data for change detection. @@ -169,9 +219,9 @@ func hashCredentials(data []byte) string { return hex.EncodeToString(h[:]) } -// getOrCreateOverlayClient gets a cached overlay client or creates a new one. -// Security: Credentials are never logged, client has timeouts, cache invalidates on rotation. -func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *apiv1.Symphony) (client.Client, error) { +// getOrCreateOverlayWatcher gets a cached overlay watcher or creates a new one. +// Security: Credentials are never logged, watchers have timeouts, cache invalidates on rotation. +func (c *Controller) getOrCreateOverlayWatcher(ctx context.Context, symphony *apiv1.Symphony) (*overlayWatcher, error) { logger := logr.FromContextOrDiscard(ctx) key := fmt.Sprintf("%s/%s", symphony.Namespace, symphony.Name) @@ -184,7 +234,6 @@ func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *api } // SECURITY: Only allow accessing secrets in the Symphony's namespace - // This prevents cross-namespace credential access if secretKey.Namespace == "" { secretKey.Namespace = symphony.Namespace } @@ -211,14 +260,15 @@ func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *api credHash := hashCredentials(kubeconfigData) // Check cache - invalidate if credentials changed or TTL expired - if cached, ok := c.overlayClients.Load(key); ok { - oc := cached.(*overlayClient) - if time.Since(oc.createdAt) < c.clientCacheTTL && oc.credentialHash == credHash { - return oc.client, nil + if cached, ok := c.overlayWatchers.Load(key); ok { + watcher := cached.(*overlayWatcher) + if time.Since(watcher.createdAt) < c.watcherCacheTTL && watcher.credentialHash == credHash { + return watcher, nil } - // Cache expired or credentials rotated - logger.V(1).Info("invalidating cached overlay client", - "reason", map[bool]string{true: "credential_rotation", false: "ttl_expired"}[oc.credentialHash != credHash]) + // Cache expired or credentials rotated - cleanup old watcher + logger.V(1).Info("invalidating cached overlay watcher", + "reason", map[bool]string{true: "credential_rotation", false: "ttl_expired"}[watcher.credentialHash != credHash]) + c.cleanupWatcher(key) } // Create REST config from kubeconfig @@ -227,37 +277,179 @@ func (c *Controller) getOrCreateOverlayClient(ctx context.Context, symphony *api return nil, fmt.Errorf("parsing kubeconfig: %w", err) } - // SECURITY: Apply rate limiting and timeouts to prevent resource exhaustion + // SECURITY: Apply rate limiting and timeouts restConfig.Timeout = overlayClientTimeout restConfig.QPS = overlayClientQPS restConfig.Burst = overlayClientBurst - - // Set a meaningful user agent for audit logs on the overlay restConfig.UserAgent = "eno-overlay-sync-controller" - // Create client - oc, err := client.New(restConfig, client.Options{}) + // Create dynamic client for informers + dynamicClient, err := dynamic.NewForConfig(restConfig) if err != nil { - return nil, fmt.Errorf("creating overlay client: %w", err) + return nil, fmt.Errorf("creating dynamic client: %w", err) } - // Cache the client with credential hash for rotation detection - c.overlayClients.Store(key, &overlayClient{ - client: oc, - createdAt: time.Now(), - credentialHash: credHash, - }) + // Create informer factory + stopCh := make(chan struct{}) + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, WatchResyncPeriod) + + watcher := &overlayWatcher{ + symphonyKey: key, + credentialHash: credHash, + createdAt: time.Now(), + dynamicClient: dynamicClient, + informerFactory: informerFactory, + stopCh: stopCh, + watchedGVRs: make(map[schema.GroupVersionResource]struct{}), + controller: c, + } + + // Cache the watcher + c.overlayWatchers.Store(key, watcher) + logger.V(1).Info("created overlay watcher") + + return watcher, nil +} + +// ensureInformers sets up informers for all resource refs in the symphony +func (w *overlayWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Symphony) error { + logger := logr.FromContextOrDiscard(ctx) + w.mu.Lock() + defer w.mu.Unlock() + + // Track which GVRs we need + neededGVRs := make(map[schema.GroupVersionResource][]apiv1.OverlayResourceRef) + for _, ref := range symphony.Spec.OverlayResourceRefs { + gvr := schema.GroupVersionResource{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Resource: pluralize(ref.Resource.Kind), + } + neededGVRs[gvr] = append(neededGVRs[gvr], ref) + } + + // Set up informers for new GVRs + for gvr, refs := range neededGVRs { + if _, exists := w.watchedGVRs[gvr]; exists { + continue + } + + logger.V(1).Info("setting up informer for GVR", "gvr", gvr.String()) + + informer := w.informerFactory.ForResource(gvr).Informer() + + // Add event handler that triggers reconcile on changes + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, newObj) + }, + DeleteFunc: func(obj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, obj) + }, + }) + if err != nil { + return fmt.Errorf("adding event handler for %s: %w", gvr.String(), err) + } + + w.watchedGVRs[gvr] = struct{}{} + } + + // Start the informer factory (idempotent if already started) + w.informerFactory.Start(w.stopCh) + + // Wait for caches to sync + synced := w.informerFactory.WaitForCacheSync(w.stopCh) + for gvr, ok := range synced { + if !ok { + logger.Error(nil, "failed to sync informer cache", "gvr", gvr.String()) + } + } + + return nil +} + +// enqueueReconcile checks if the object matches any refs and enqueues a reconcile if so +func (w *overlayWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.Symphony, refs []apiv1.OverlayResourceRef, obj interface{}) { + logger := logr.FromContextOrDiscard(ctx) - // SECURITY: Don't log secret name in production, only log that client was created - logger.V(1).Info("created overlay client") - return oc, nil + u, ok := obj.(*unstructured.Unstructured) + if !ok { + // Handle DeletedFinalStateUnknown + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + u, ok = tombstone.Obj.(*unstructured.Unstructured) + if !ok { + return + } + } else { + return + } + } + + // Check if this object matches any of our refs + for _, ref := range refs { + if matchesRef(u, ref) { + logger.V(2).Info("overlay resource changed, enqueueing reconcile", + "resource", u.GetName(), + "namespace", u.GetNamespace(), + "key", ref.Key, + ) + // Enqueue the symphony for reconcile + w.controller.reconcileQueue.Add(ctrl.Request{ + NamespacedName: types.NamespacedName{ + Name: symphony.Name, + Namespace: symphony.Namespace, + }, + }) + return + } + } +} + +// matchesRef checks if an unstructured object matches an overlay resource ref +func matchesRef(obj *unstructured.Unstructured, ref apiv1.OverlayResourceRef) bool { + return obj.GetName() == ref.Resource.Name && + obj.GetNamespace() == ref.Resource.Namespace +} + +// pluralize converts a Kind to its plural resource name (simple heuristic) +func pluralize(kind string) string { + lower := string(kind[0]+32) + kind[1:] // lowercase first letter + if lower[len(lower)-1] == 's' { + return lower + "es" + } + if lower[len(lower)-1] == 'y' { + return lower[:len(lower)-1] + "ies" + } + return lower + "s" +} + +// getResource fetches a resource from the overlay cluster using the dynamic client +func (w *overlayWatcher) getResource(ctx context.Context, ref apiv1.OverlayResourceRef) (*unstructured.Unstructured, error) { + gvr := schema.GroupVersionResource{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Resource: pluralize(ref.Resource.Kind), + } + + var obj *unstructured.Unstructured + var err error + + if ref.Resource.Namespace != "" { + obj, err = w.dynamicClient.Resource(gvr).Namespace(ref.Resource.Namespace).Get(ctx, ref.Resource.Name, metav1.GetOptions{}) + } else { + obj, err = w.dynamicClient.Resource(gvr).Get(ctx, ref.Resource.Name, metav1.GetOptions{}) + } + + return obj, err } // syncResult holds the result of syncing a single overlay resource type syncResult struct { - key string - requeue time.Duration - err error + key string + err error } // syncOverlayResourcesParallel syncs all overlay resource refs in parallel with bounded concurrency. @@ -266,13 +458,13 @@ type syncResult struct { func (c *Controller) syncOverlayResourcesParallel( ctx context.Context, symphony *apiv1.Symphony, - overlayClient client.Client, -) time.Duration { + watcher *overlayWatcher, +) { logger := logr.FromContextOrDiscard(ctx) refs := symphony.Spec.OverlayResourceRefs if len(refs) == 0 { - return DefaultSyncInterval + return } // Use a semaphore to limit concurrent overlay API calls @@ -295,8 +487,8 @@ func (c *Controller) syncOverlayResourcesParallel( return nil } - requeue, err := c.syncOverlayResource(ctx, symphony, overlayClient, ref) - results <- syncResult{key: ref.Key, requeue: requeue, err: err} + err := c.syncOverlayResource(ctx, symphony, watcher, ref) + results <- syncResult{key: ref.Key, err: err} return nil // Don't propagate errors - we handle them individually }) } @@ -305,8 +497,7 @@ func (c *Controller) syncOverlayResourcesParallel( _ = g.Wait() close(results) - // Process results and determine minimum requeue time - var minRequeue time.Duration + // Process results var successCount, failCount int for result := range results { @@ -316,60 +507,39 @@ func (c *Controller) syncOverlayResourcesParallel( continue } successCount++ - if result.requeue > 0 && (minRequeue == 0 || result.requeue < minRequeue) { - minRequeue = result.requeue - } } logger.V(1).Info("completed parallel overlay sync", "total", len(refs), "success", successCount, "failed", failCount, - "minRequeue", minRequeue, ) - - if minRequeue == 0 { - return DefaultSyncInterval - } - return minRequeue } // syncOverlayResource syncs a single overlay resource to an InputMirror func (c *Controller) syncOverlayResource( ctx context.Context, symphony *apiv1.Symphony, - overlayClient client.Client, + watcher *overlayWatcher, ref apiv1.OverlayResourceRef, -) (time.Duration, error) { +) error { logger := logr.FromContextOrDiscard(ctx).WithValues("key", ref.Key, "resourceName", ref.Resource.Name) // SECURITY: Validate the resource kind is allowed to be synced gk := schema.GroupKind{Group: ref.Resource.Group, Kind: ref.Resource.Kind} if !c.allowedKinds[gk] { - return 0, fmt.Errorf("security: resource kind %q is not allowed to be synced from overlay", gk.String()) - } - - // Fetch from overlay - obj := &unstructured.Unstructured{} - obj.SetGroupVersionKind(schema.GroupVersionKind{ - Group: ref.Resource.Group, - Version: ref.Resource.Version, - Kind: ref.Resource.Kind, - }) - - objKey := types.NamespacedName{ - Name: ref.Resource.Name, - Namespace: ref.Resource.Namespace, + return fmt.Errorf("security: resource kind %q is not allowed to be synced from overlay", gk.String()) } - err := overlayClient.Get(ctx, objKey, obj) + // Fetch from overlay using the watcher's dynamic client + obj, err := watcher.getResource(ctx, ref) if err != nil { if errors.IsNotFound(err) && ref.Optional { logger.V(1).Info("optional overlay resource not found, skipping") // Update InputMirror to reflect missing state return c.updateMirrorMissing(ctx, symphony, ref) } - return 0, fmt.Errorf("getting overlay resource: %w", err) + return fmt.Errorf("getting overlay resource: %w", err) } // Create/Update InputMirror @@ -396,13 +566,13 @@ func (c *Controller) syncOverlayResource( }) if err != nil { - return 0, fmt.Errorf("creating/updating InputMirror: %w", err) + return fmt.Errorf("creating/updating InputMirror: %w", err) } // Update status separately - CreateOrUpdate only updates spec, not status subresource rawData, err := json.Marshal(obj.Object) if err != nil { - return 0, fmt.Errorf("marshaling resource data: %w", err) + return fmt.Errorf("marshaling resource data: %w", err) } mirror.Status.Data = &runtime.RawExtension{Raw: rawData} mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} @@ -412,17 +582,11 @@ func (c *Controller) syncOverlayResource( setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") if err := c.client.Status().Update(ctx, mirror); err != nil { - return 0, fmt.Errorf("updating InputMirror status: %w", err) + return fmt.Errorf("updating InputMirror status: %w", err) } logger.V(1).Info("synced overlay resource", "result", result, "mirrorName", mirrorName) - - // Determine requeue interval - syncInterval := DefaultSyncInterval - if ref.SyncInterval != nil { - syncInterval = ref.SyncInterval.Duration - } - return syncInterval, nil + return nil } // updateMirrorMissing updates the InputMirror to reflect that the source resource is missing @@ -430,31 +594,23 @@ func (c *Controller) updateMirrorMissing( ctx context.Context, symphony *apiv1.Symphony, ref apiv1.OverlayResourceRef, -) (time.Duration, error) { +) error { mirrorName := inputMirrorName(symphony.Name, ref.Key) mirror := &apiv1.InputMirror{} err := c.client.Get(ctx, types.NamespacedName{Name: mirrorName, Namespace: symphony.Namespace}, mirror) if errors.IsNotFound(err) { // No mirror exists, nothing to update - return DefaultSyncInterval, nil + return nil } if err != nil { - return 0, err + return err } // Update condition to reflect missing state setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in overlay") mirror.Status.Data = nil - if err := c.client.Status().Update(ctx, mirror); err != nil { - return 0, err - } - - syncInterval := DefaultSyncInterval - if ref.SyncInterval != nil { - syncInterval = ref.SyncInterval.Duration - } - return syncInterval, nil + return c.client.Status().Update(ctx, mirror) } // cleanupOrphanedMirrors removes InputMirrors for refs that no longer exist in the Symphony diff --git a/internal/controllers/overlaysync/controller_test.go b/internal/controllers/overlaysync/controller_test.go index ba4b63e3..9712495d 100644 --- a/internal/controllers/overlaysync/controller_test.go +++ b/internal/controllers/overlaysync/controller_test.go @@ -12,6 +12,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -81,9 +83,11 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - clientCacheTTL: 10 * time.Minute, + client: client, + scheme: scheme, + watcherCacheTTL: 30 * time.Minute, + allowedKinds: AllowedSyncKinds, + reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -128,9 +132,11 @@ func TestReconcile_NoCredentials(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - clientCacheTTL: 10 * time.Minute, + client: client, + scheme: scheme, + watcherCacheTTL: 30 * time.Minute, + allowedKinds: AllowedSyncKinds, + reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -155,9 +161,11 @@ func TestReconcile_SymphonyNotFound(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - clientCacheTTL: 10 * time.Minute, + client: client, + scheme: scheme, + watcherCacheTTL: 30 * time.Minute, + allowedKinds: AllowedSyncKinds, + reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), } result, err := controller.Reconcile(context.Background(), reconcile.Request{ From a35b578b96fa1e51294d14a58c19558c67ce23f2 Mon Sep 17 00:00:00 2001 From: david kydd Date: Thu, 18 Dec 2025 15:16:28 +1300 Subject: [PATCH 5/9] migrate overlaysync controller from eno-controller to eno-reconciler --- api/v1/inputmirror.go | 11 - api/v1/symphony.go | 7 +- api/v1/zz_generated.deepcopy.go | 309 ++++++++---------- cmd/eno-controller/main.go | 7 +- cmd/eno-reconciler/main.go | 8 + .../controllers/overlaysync/controller.go | 207 ++++-------- .../overlaysync/controller_test.go | 34 +- 7 files changed, 223 insertions(+), 360 deletions(-) diff --git a/api/v1/inputmirror.go b/api/v1/inputmirror.go index a91de7af..ffe0e1de 100644 --- a/api/v1/inputmirror.go +++ b/api/v1/inputmirror.go @@ -79,17 +79,6 @@ type OverlayResourceSelector struct { Namespace string `json:"namespace,omitempty"` } -// OverlayCredentials specifies how to access an overlay cluster -type OverlayCredentials struct { - // SecretRef references a Secret containing the kubeconfig for the overlay cluster - SecretRef corev1.SecretReference `json:"secretRef"` - - // Key within the secret containing the kubeconfig data - // +kubebuilder:default="kubeconfig" - // +optional - Key string `json:"key,omitempty"` -} - // OverlayResourceRef defines a resource to sync from an overlay cluster type OverlayResourceRef struct { // Key that will be used to reference this input in Composition bindings. diff --git a/api/v1/symphony.go b/api/v1/symphony.go index 94c3ccd3..79a1a174 100644 --- a/api/v1/symphony.go +++ b/api/v1/symphony.go @@ -40,14 +40,9 @@ type SymphonySpec struct { // +kubebuilder:validation:MaxItems:=50 SynthesisEnv []EnvVar `json:"synthesisEnv,omitempty"` // deprecated synthesis env should always be variation scoped. - // OverlayCredentials specifies how to access the overlay cluster. - // When set, the OverlaySyncController will use these credentials to sync - // resources specified in OverlayResourceRefs. - // +optional - OverlayCredentials *OverlayCredentials `json:"overlayCredentials,omitempty"` - // OverlayResourceRefs specifies resources to sync from the overlay cluster. // Each ref results in an InputMirror being created that can be bound as an input. + // The overlay cluster is accessed via the eno-reconciler's --remote-kubeconfig. // +optional // +kubebuilder:validation:MaxItems:=20 OverlayResourceRefs []OverlayResourceRef `json:"overlayResourceRefs,omitempty"` diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index ab362c98..b935e7ed 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -7,7 +7,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -185,6 +185,113 @@ func (in *Input) DeepCopy() *Input { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirror) DeepCopyInto(out *InputMirror) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirror. +func (in *InputMirror) DeepCopy() *InputMirror { + if in == nil { + return nil + } + out := new(InputMirror) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirror) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorList) DeepCopyInto(out *InputMirrorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]InputMirror, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorList. +func (in *InputMirrorList) DeepCopy() *InputMirrorList { + if in == nil { + return nil + } + out := new(InputMirrorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirrorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorSpec) DeepCopyInto(out *InputMirrorSpec) { + *out = *in + out.SymphonyRef = in.SymphonyRef + out.SourceResource = in.SourceResource +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorSpec. +func (in *InputMirrorSpec) DeepCopy() *InputMirrorSpec { + if in == nil { + return nil + } + out := new(InputMirrorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorStatus) DeepCopyInto(out *InputMirrorStatus) { + *out = *in + if in.Data != nil { + in, out := &in.Data, &out.Data + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.LastSyncTime != nil { + in, out := &in.LastSyncTime, &out.LastSyncTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorStatus. +func (in *InputMirrorStatus) DeepCopy() *InputMirrorStatus { + if in == nil { + return nil + } + out := new(InputMirrorStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InputResource) DeepCopyInto(out *InputResource) { *out = *in @@ -245,6 +352,42 @@ func (in *Manifest) DeepCopy() *Manifest { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OverlayResourceRef) DeepCopyInto(out *OverlayResourceRef) { + *out = *in + out.Resource = in.Resource + if in.SyncInterval != nil { + in, out := &in.SyncInterval, &out.SyncInterval + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceRef. +func (in *OverlayResourceRef) DeepCopy() *OverlayResourceRef { + if in == nil { + return nil + } + out := new(OverlayResourceRef) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OverlayResourceSelector) DeepCopyInto(out *OverlayResourceSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceSelector. +func (in *OverlayResourceSelector) DeepCopy() *OverlayResourceSelector { + if in == nil { + return nil + } + out := new(OverlayResourceSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodOverrides) DeepCopyInto(out *PodOverrides) { *out = *in @@ -582,11 +725,6 @@ func (in *SymphonySpec) DeepCopyInto(out *SymphonySpec) { *out = make([]EnvVar, len(*in)) copy(*out, *in) } - if in.OverlayCredentials != nil { - in, out := &in.OverlayCredentials, &out.OverlayCredentials - *out = new(OverlayCredentials) - **out = **in - } if in.OverlayResourceRefs != nil { in, out := &in.OverlayResourceRefs, &out.OverlayResourceRefs *out = make([]OverlayResourceRef, len(*in)) @@ -857,162 +995,3 @@ func (in *Variation) DeepCopy() *Variation { in.DeepCopyInto(out) return out } - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *InputMirror) DeepCopyInto(out *InputMirror) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - in.Spec.DeepCopyInto(&out.Spec) - in.Status.DeepCopyInto(&out.Status) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirror. -func (in *InputMirror) DeepCopy() *InputMirror { - if in == nil { - return nil - } - out := new(InputMirror) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *InputMirror) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *InputMirrorList) DeepCopyInto(out *InputMirrorList) { - *out = *in - out.TypeMeta = in.TypeMeta - in.ListMeta.DeepCopyInto(&out.ListMeta) - if in.Items != nil { - in, out := &in.Items, &out.Items - *out = make([]InputMirror, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorList. -func (in *InputMirrorList) DeepCopy() *InputMirrorList { - if in == nil { - return nil - } - out := new(InputMirrorList) - in.DeepCopyInto(out) - return out -} - -// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *InputMirrorList) DeepCopyObject() runtime.Object { - if c := in.DeepCopy(); c != nil { - return c - } - return nil -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *InputMirrorSpec) DeepCopyInto(out *InputMirrorSpec) { - *out = *in - out.SymphonyRef = in.SymphonyRef - out.SourceResource = in.SourceResource -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorSpec. -func (in *InputMirrorSpec) DeepCopy() *InputMirrorSpec { - if in == nil { - return nil - } - out := new(InputMirrorSpec) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *InputMirrorStatus) DeepCopyInto(out *InputMirrorStatus) { - *out = *in - if in.Data != nil { - in, out := &in.Data, &out.Data - *out = new(runtime.RawExtension) - (*in).DeepCopyInto(*out) - } - if in.LastSyncTime != nil { - in, out := &in.LastSyncTime, &out.LastSyncTime - *out = (*in).DeepCopy() - } - if in.Conditions != nil { - in, out := &in.Conditions, &out.Conditions - *out = make([]metav1.Condition, len(*in)) - for i := range *in { - (*in)[i].DeepCopyInto(&(*out)[i]) - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorStatus. -func (in *InputMirrorStatus) DeepCopy() *InputMirrorStatus { - if in == nil { - return nil - } - out := new(InputMirrorStatus) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *OverlayResourceSelector) DeepCopyInto(out *OverlayResourceSelector) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceSelector. -func (in *OverlayResourceSelector) DeepCopy() *OverlayResourceSelector { - if in == nil { - return nil - } - out := new(OverlayResourceSelector) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *OverlayCredentials) DeepCopyInto(out *OverlayCredentials) { - *out = *in - out.SecretRef = in.SecretRef -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayCredentials. -func (in *OverlayCredentials) DeepCopy() *OverlayCredentials { - if in == nil { - return nil - } - out := new(OverlayCredentials) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *OverlayResourceRef) DeepCopyInto(out *OverlayResourceRef) { - *out = *in - out.Resource = in.Resource - if in.SyncInterval != nil { - in, out := &in.SyncInterval, &out.SyncInterval - *out = new(metav1.Duration) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceRef. -func (in *OverlayResourceRef) DeepCopy() *OverlayResourceRef { - if in == nil { - return nil - } - out := new(OverlayResourceRef) - in.DeepCopyInto(out) - return out -} diff --git a/cmd/eno-controller/main.go b/cmd/eno-controller/main.go index 7e532b52..fbbccf9c 100644 --- a/cmd/eno-controller/main.go +++ b/cmd/eno-controller/main.go @@ -19,7 +19,6 @@ import ( v1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/controllers/composition" - "github.com/Azure/eno/internal/controllers/overlaysync" "github.com/Azure/eno/internal/controllers/resourceslice" "github.com/Azure/eno/internal/controllers/scheduling" "github.com/Azure/eno/internal/controllers/symphony" @@ -171,10 +170,8 @@ func runController() error { return fmt.Errorf("constructing symphony controller: %w", err) } - err = overlaysync.NewController(mgr) - if err != nil { - return fmt.Errorf("constructing overlay sync controller: %w", err) - } + // Note: OverlaySyncController has been moved to eno-reconciler + // where it uses the reconciler's --remote-kubeconfig for overlay access return mgr.Start(ctx) } diff --git a/cmd/eno-reconciler/main.go b/cmd/eno-reconciler/main.go index dcf411d1..a71d1ea7 100644 --- a/cmd/eno-reconciler/main.go +++ b/cmd/eno-reconciler/main.go @@ -15,6 +15,7 @@ import ( "github.com/Azure/eno/internal/cel" "github.com/Azure/eno/internal/controllers/liveness" + "github.com/Azure/eno/internal/controllers/overlaysync" "github.com/Azure/eno/internal/controllers/reconciliation" "github.com/Azure/eno/internal/flowcontrol" "github.com/Azure/eno/internal/k8s" @@ -130,5 +131,12 @@ func run() error { return fmt.Errorf("constructing reconciliation controller: %w", err) } + // OverlaySyncController uses remoteConfig (overlay) to watch resources + // and syncs them to InputMirrors on the underlay (mgr's client) + err = overlaysync.NewController(mgr, remoteConfig) + if err != nil { + return fmt.Errorf("constructing overlay sync controller: %w", err) + } + return mgr.Start(ctx) } diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/overlaysync/controller.go index a8af90a3..afc30641 100644 --- a/internal/controllers/overlaysync/controller.go +++ b/internal/controllers/overlaysync/controller.go @@ -2,24 +2,22 @@ // from overlay clusters to the underlay as InputMirror resources. // // ARCHITECTURE: -// This controller uses a watch-based approach for real-time sync instead of polling: -// 1. Each Symphony with overlay refs gets a dedicated "overlay watcher" -// 2. The watcher sets up dynamic informers on the overlay cluster for each ref +// This controller runs in the eno-reconciler and uses the reconciler's existing +// overlay client (configured via --remote-kubeconfig) to watch and sync resources: +// 1. The controller is initialized with a pre-configured overlay REST config +// 2. It sets up dynamic informers on the overlay cluster for each Symphony's refs // 3. When a watched resource changes, the informer triggers a reconcile -// 4. The reconciler syncs the changed resource to the corresponding InputMirror +// 4. The reconciler syncs the changed resource to the corresponding InputMirror on the underlay // // SECURITY CONSIDERATIONS: -// - Overlay credentials are stored in Secrets and never logged -// - Secret access is restricted to the Symphony's namespace by default +// - Overlay credentials are managed by the reconciler's --remote-kubeconfig flag +// - No credentials stored in Symphony specs // - REST client has timeouts to prevent resource exhaustion -// - Cached watchers are invalidated on credential rotation // - Only specified resource types can be synced (no arbitrary access) package overlaysync import ( "context" - "crypto/sha256" - "encoding/hex" "encoding/json" "fmt" "sync" @@ -38,8 +36,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -80,15 +78,11 @@ var AllowedSyncKinds = map[schema.GroupKind]bool{ // Explicitly NOT allowing: Secret, ServiceAccount, etc. } -// overlayWatcher manages watch connections to a single overlay cluster. +// overlayWatcher manages watch connections to the overlay cluster. // It maintains dynamic informers for each resource type being watched. type overlayWatcher struct { mu sync.RWMutex - symphonyKey string // namespace/name of the symphony - credentialHash string // Hash of credentials to detect rotation - createdAt time.Time - // Dynamic client and informer factory for the overlay cluster dynamicClient dynamic.Interface informerFactory dynamicinformer.DynamicSharedInformerFactory @@ -107,11 +101,9 @@ type Controller struct { client client.Client scheme *runtime.Scheme - // overlayWatchers manages watch connections keyed by symphony namespace/name - overlayWatchers sync.Map - - // watcherCacheTTL determines how long overlay watchers are cached before refresh - watcherCacheTTL time.Duration + // overlayWatcher is the shared watcher for the overlay cluster + // initialized once from the overlay REST config passed to NewController + overlayWatcher *overlayWatcher // allowedKinds can be overridden for testing allowedKinds map[schema.GroupKind]bool @@ -121,17 +113,27 @@ type Controller struct { } // NewController creates a new OverlaySyncController and registers it with the manager. -func NewController(mgr ctrl.Manager) error { +// The overlayConfig is the REST config for the overlay cluster (typically from --remote-kubeconfig). +// If overlayConfig is nil, the controller will not sync any resources. +func NewController(mgr ctrl.Manager, overlayConfig *rest.Config) error { c := &Controller{ - client: mgr.GetClient(), - scheme: mgr.GetScheme(), - watcherCacheTTL: 30 * time.Minute, - allowedKinds: AllowedSyncKinds, + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + allowedKinds: AllowedSyncKinds, reconcileQueue: workqueue.NewTypedRateLimitingQueue( workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](), ), } + // Initialize the shared overlay watcher if config is provided + if overlayConfig != nil { + watcher, err := newOverlayWatcher(overlayConfig, c) + if err != nil { + return fmt.Errorf("creating overlay watcher: %w", err) + } + c.overlayWatcher = watcher + } + return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Symphony{}). Owns(&apiv1.InputMirror{}). @@ -139,14 +141,38 @@ func NewController(mgr ctrl.Manager) error { Complete(c) } +// newOverlayWatcher creates a new overlay watcher from the given REST config +func newOverlayWatcher(config *rest.Config, controller *Controller) (*overlayWatcher, error) { + // Create dynamic client for informers + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("creating dynamic client: %w", err) + } + + // Create informer factory + stopCh := make(chan struct{}) + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, WatchResyncPeriod) + + return &overlayWatcher{ + dynamicClient: dynamicClient, + informerFactory: informerFactory, + stopCh: stopCh, + watchedGVRs: make(map[schema.GroupVersionResource]struct{}), + controller: controller, + }, nil +} + func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logr.FromContextOrDiscard(ctx) + // Skip if no overlay watcher configured (no --remote-kubeconfig) + if c.overlayWatcher == nil { + return ctrl.Result{}, nil + } + symphony := &apiv1.Symphony{} if err := c.client.Get(ctx, req.NamespacedName, symphony); err != nil { if errors.IsNotFound(err) { - // Symphony deleted, cleanup watcher - c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } return ctrl.Result{}, err @@ -160,40 +186,24 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Skip if no overlay resource refs defined if len(symphony.Spec.OverlayResourceRefs) == 0 { - c.cleanupWatcher(req.String()) - return ctrl.Result{}, nil - } - - // Skip if no overlay credentials provided - if symphony.Spec.OverlayCredentials == nil { - logger.V(1).Info("symphony has overlay resource refs but no credentials, skipping") - c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } // Handle deletion if symphony.DeletionTimestamp != nil { // InputMirrors will be garbage collected via owner references - c.cleanupWatcher(req.String()) return ctrl.Result{}, nil } - // Get or create overlay watcher with watches - watcher, err := c.getOrCreateOverlayWatcher(ctx, symphony) - if err != nil { - logger.Error(err, "failed to create overlay watcher") - return ctrl.Result{RequeueAfter: time.Minute}, nil - } - // Ensure informers are set up for all resource refs - if err := watcher.ensureInformers(ctx, symphony); err != nil { + if err := c.overlayWatcher.ensureInformers(ctx, symphony); err != nil { logger.Error(err, "failed to setup informers") return ctrl.Result{RequeueAfter: time.Minute}, nil } // Sync all overlay resource refs in parallel with bounded concurrency // This handles the initial sync and any changes detected by watches - c.syncOverlayResourcesParallel(ctx, symphony, watcher) + c.syncOverlayResourcesParallel(ctx, symphony, c.overlayWatcher) // Clean up InputMirrors for refs that no longer exist if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { @@ -204,113 +214,6 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return ctrl.Result{RequeueAfter: FallbackSyncInterval}, nil } -// cleanupWatcher stops and removes the overlay watcher for a symphony -func (c *Controller) cleanupWatcher(key string) { - if val, ok := c.overlayWatchers.LoadAndDelete(key); ok { - watcher := val.(*overlayWatcher) - close(watcher.stopCh) - } -} - -// hashCredentials creates a SHA256 hash of credential data for change detection. -// This allows detecting credential rotation without storing the credentials. -func hashCredentials(data []byte) string { - h := sha256.Sum256(data) - return hex.EncodeToString(h[:]) -} - -// getOrCreateOverlayWatcher gets a cached overlay watcher or creates a new one. -// Security: Credentials are never logged, watchers have timeouts, cache invalidates on rotation. -func (c *Controller) getOrCreateOverlayWatcher(ctx context.Context, symphony *apiv1.Symphony) (*overlayWatcher, error) { - logger := logr.FromContextOrDiscard(ctx) - key := fmt.Sprintf("%s/%s", symphony.Namespace, symphony.Name) - - // Get the kubeconfig secret first to check for credential rotation - creds := symphony.Spec.OverlayCredentials - secret := &corev1.Secret{} - secretKey := types.NamespacedName{ - Name: creds.SecretRef.Name, - Namespace: creds.SecretRef.Namespace, - } - - // SECURITY: Only allow accessing secrets in the Symphony's namespace - if secretKey.Namespace == "" { - secretKey.Namespace = symphony.Namespace - } - if secretKey.Namespace != symphony.Namespace { - return nil, fmt.Errorf("security: credential secret must be in symphony namespace %q, got %q", - symphony.Namespace, secretKey.Namespace) - } - - if err := c.client.Get(ctx, secretKey, secret); err != nil { - return nil, fmt.Errorf("getting overlay credentials secret: %w", err) - } - - // Get kubeconfig data - NEVER log this - kubeconfigKey := creds.Key - if kubeconfigKey == "" { - kubeconfigKey = "kubeconfig" - } - kubeconfigData, ok := secret.Data[kubeconfigKey] - if !ok { - return nil, fmt.Errorf("kubeconfig key %q not found in secret", kubeconfigKey) - } - - // Hash credentials to detect rotation without storing them - credHash := hashCredentials(kubeconfigData) - - // Check cache - invalidate if credentials changed or TTL expired - if cached, ok := c.overlayWatchers.Load(key); ok { - watcher := cached.(*overlayWatcher) - if time.Since(watcher.createdAt) < c.watcherCacheTTL && watcher.credentialHash == credHash { - return watcher, nil - } - // Cache expired or credentials rotated - cleanup old watcher - logger.V(1).Info("invalidating cached overlay watcher", - "reason", map[bool]string{true: "credential_rotation", false: "ttl_expired"}[watcher.credentialHash != credHash]) - c.cleanupWatcher(key) - } - - // Create REST config from kubeconfig - restConfig, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigData) - if err != nil { - return nil, fmt.Errorf("parsing kubeconfig: %w", err) - } - - // SECURITY: Apply rate limiting and timeouts - restConfig.Timeout = overlayClientTimeout - restConfig.QPS = overlayClientQPS - restConfig.Burst = overlayClientBurst - restConfig.UserAgent = "eno-overlay-sync-controller" - - // Create dynamic client for informers - dynamicClient, err := dynamic.NewForConfig(restConfig) - if err != nil { - return nil, fmt.Errorf("creating dynamic client: %w", err) - } - - // Create informer factory - stopCh := make(chan struct{}) - informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, WatchResyncPeriod) - - watcher := &overlayWatcher{ - symphonyKey: key, - credentialHash: credHash, - createdAt: time.Now(), - dynamicClient: dynamicClient, - informerFactory: informerFactory, - stopCh: stopCh, - watchedGVRs: make(map[schema.GroupVersionResource]struct{}), - controller: c, - } - - // Cache the watcher - c.overlayWatchers.Store(key, watcher) - logger.V(1).Info("created overlay watcher") - - return watcher, nil -} - // ensureInformers sets up informers for all resource refs in the symphony func (w *overlayWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Symphony) error { logger := logr.FromContextOrDiscard(ctx) diff --git a/internal/controllers/overlaysync/controller_test.go b/internal/controllers/overlaysync/controller_test.go index 9712495d..ccaff8f2 100644 --- a/internal/controllers/overlaysync/controller_test.go +++ b/internal/controllers/overlaysync/controller_test.go @@ -3,7 +3,6 @@ package overlaysync import ( "context" "testing" - "time" apiv1 "github.com/Azure/eno/api/v1" "github.com/stretchr/testify/assert" @@ -12,8 +11,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/workqueue" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -83,11 +80,10 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - watcherCacheTTL: 30 * time.Minute, - allowedKinds: AllowedSyncKinds, - reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, + // overlayWatcher nil - simulates no overlay config provided } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -101,7 +97,7 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { assert.Equal(t, reconcile.Result{}, result) } -func TestReconcile_NoCredentials(t *testing.T) { +func TestReconcile_NoOverlayWatcher(t *testing.T) { scheme := runtime.NewScheme() require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) require.NoError(t, corev1.AddToScheme(scheme)) @@ -122,7 +118,6 @@ func TestReconcile_NoCredentials(t *testing.T) { }, }, }, - // No OverlayCredentials }, } @@ -132,11 +127,10 @@ func TestReconcile_NoCredentials(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - watcherCacheTTL: 30 * time.Minute, - allowedKinds: AllowedSyncKinds, - reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, + // overlayWatcher nil - no overlay config provided to eno-reconciler } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -147,7 +141,7 @@ func TestReconcile_NoCredentials(t *testing.T) { }) require.NoError(t, err) - // Should return empty result since no credentials + // Should return empty result since no overlay watcher available assert.Equal(t, reconcile.Result{}, result) } @@ -161,11 +155,9 @@ func TestReconcile_SymphonyNotFound(t *testing.T) { Build() controller := &Controller{ - client: client, - scheme: scheme, - watcherCacheTTL: 30 * time.Minute, - allowedKinds: AllowedSyncKinds, - reconcileQueue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[ctrl.Request]()), + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, } result, err := controller.Reconcile(context.Background(), reconcile.Request{ From 9381b029bcfaf7be986ed4377f7fc9916c7c9198 Mon Sep 17 00:00:00 2001 From: david kydd Date: Thu, 18 Dec 2025 15:16:54 +1300 Subject: [PATCH 6/9] prune comment --- cmd/eno-controller/main.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/eno-controller/main.go b/cmd/eno-controller/main.go index fbbccf9c..f0135384 100644 --- a/cmd/eno-controller/main.go +++ b/cmd/eno-controller/main.go @@ -170,9 +170,6 @@ func runController() error { return fmt.Errorf("constructing symphony controller: %w", err) } - // Note: OverlaySyncController has been moved to eno-reconciler - // where it uses the reconciler's --remote-kubeconfig for overlay access - return mgr.Start(ctx) } From 6d1c9fdbeda4381ab3bc18d90d04c79020345ab3 Mon Sep 17 00:00:00 2001 From: david kydd Date: Fri, 19 Dec 2025 09:50:22 +1300 Subject: [PATCH 7/9] align on "remote" rather than "overlay" nomenclature --- api/v1/inputmirror.go | 22 +-- api/v1/symphony.go | 6 +- api/v1/zz_generated.deepcopy.go | 22 +-- cmd/eno-reconciler/main.go | 10 +- .../{overlaysync => remotesync}/controller.go | 146 +++++++++--------- .../controller_test.go | 18 +-- 6 files changed, 112 insertions(+), 112 deletions(-) rename internal/controllers/{overlaysync => remotesync}/controller.go (76%) rename internal/controllers/{overlaysync => remotesync}/controller_test.go (90%) diff --git a/api/v1/inputmirror.go b/api/v1/inputmirror.go index ffe0e1de..aa9c8377 100644 --- a/api/v1/inputmirror.go +++ b/api/v1/inputmirror.go @@ -13,8 +13,8 @@ type InputMirrorList struct { Items []InputMirror `json:"items"` } -// InputMirror stores a copy of a resource from an overlay cluster. -// It is created and managed by the OverlaySyncController based on Symphony.spec.overlayResourceRefs. +// InputMirror stores a copy of a resource from a remote cluster. +// It is created and managed by the RemoteSyncController based on Symphony.spec.remoteResourceRefs. // Compositions can bind to InputMirrors just like any other resource. // // +kubebuilder:object:root=true @@ -37,8 +37,8 @@ type InputMirrorSpec struct { // SymphonyRef points to the owning Symphony SymphonyRef corev1.LocalObjectReference `json:"symphonyRef"` - // SourceResource describes what resource to sync from the overlay - SourceResource OverlayResourceSelector `json:"sourceResource"` + // SourceResource describes what resource to sync from the remote cluster + SourceResource RemoteResourceSelector `json:"sourceResource"` } type InputMirrorStatus struct { @@ -59,8 +59,8 @@ type InputMirrorStatus struct { Conditions []metav1.Condition `json:"conditions,omitempty"` } -// OverlayResourceSelector describes a resource to sync from an overlay cluster -type OverlayResourceSelector struct { +// RemoteResourceSelector describes a resource to sync from a remote cluster +type RemoteResourceSelector struct { // API Group of the resource (empty string for core API group) // +optional Group string `json:"group,omitempty"` @@ -79,21 +79,21 @@ type OverlayResourceSelector struct { Namespace string `json:"namespace,omitempty"` } -// OverlayResourceRef defines a resource to sync from an overlay cluster -type OverlayResourceRef struct { +// RemoteResourceRef defines a resource to sync from a remote cluster +type RemoteResourceRef struct { // Key that will be used to reference this input in Composition bindings. // This key maps to an auto-created InputMirror resource. Key string `json:"key"` - // Resource specifies what to fetch from the overlay cluster - Resource OverlayResourceSelector `json:"resource"` + // Resource specifies what to fetch from the remote cluster + Resource RemoteResourceSelector `json:"resource"` // SyncInterval determines how often to re-sync the resource. // +kubebuilder:default="5m" // +optional SyncInterval *metav1.Duration `json:"syncInterval,omitempty"` - // Optional indicates that synthesis can proceed if this resource doesn't exist in the overlay. + // Optional indicates that synthesis can proceed if this resource doesn't exist in the remote cluster. // +kubebuilder:default=false // +optional Optional bool `json:"optional,omitempty"` diff --git a/api/v1/symphony.go b/api/v1/symphony.go index 79a1a174..66f61f12 100644 --- a/api/v1/symphony.go +++ b/api/v1/symphony.go @@ -40,12 +40,12 @@ type SymphonySpec struct { // +kubebuilder:validation:MaxItems:=50 SynthesisEnv []EnvVar `json:"synthesisEnv,omitempty"` // deprecated synthesis env should always be variation scoped. - // OverlayResourceRefs specifies resources to sync from the overlay cluster. + // RemoteResourceRefs specifies resources to sync from the remote cluster. // Each ref results in an InputMirror being created that can be bound as an input. - // The overlay cluster is accessed via the eno-reconciler's --remote-kubeconfig. + // The remote cluster is accessed via the eno-reconciler's --remote-kubeconfig. // +optional // +kubebuilder:validation:MaxItems:=20 - OverlayResourceRefs []OverlayResourceRef `json:"overlayResourceRefs,omitempty"` + RemoteResourceRefs []RemoteResourceRef `json:"remoteResourceRefs,omitempty"` } type SymphonyStatus struct { diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index b935e7ed..a8bc459e 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -353,7 +353,7 @@ func (in *Manifest) DeepCopy() *Manifest { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *OverlayResourceRef) DeepCopyInto(out *OverlayResourceRef) { +func (in *RemoteResourceRef) DeepCopyInto(out *RemoteResourceRef) { *out = *in out.Resource = in.Resource if in.SyncInterval != nil { @@ -363,27 +363,27 @@ func (in *OverlayResourceRef) DeepCopyInto(out *OverlayResourceRef) { } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceRef. -func (in *OverlayResourceRef) DeepCopy() *OverlayResourceRef { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceRef. +func (in *RemoteResourceRef) DeepCopy() *RemoteResourceRef { if in == nil { return nil } - out := new(OverlayResourceRef) + out := new(RemoteResourceRef) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *OverlayResourceSelector) DeepCopyInto(out *OverlayResourceSelector) { +func (in *RemoteResourceSelector) DeepCopyInto(out *RemoteResourceSelector) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OverlayResourceSelector. -func (in *OverlayResourceSelector) DeepCopy() *OverlayResourceSelector { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceSelector. +func (in *RemoteResourceSelector) DeepCopy() *RemoteResourceSelector { if in == nil { return nil } - out := new(OverlayResourceSelector) + out := new(RemoteResourceSelector) in.DeepCopyInto(out) return out } @@ -725,9 +725,9 @@ func (in *SymphonySpec) DeepCopyInto(out *SymphonySpec) { *out = make([]EnvVar, len(*in)) copy(*out, *in) } - if in.OverlayResourceRefs != nil { - in, out := &in.OverlayResourceRefs, &out.OverlayResourceRefs - *out = make([]OverlayResourceRef, len(*in)) + if in.RemoteResourceRefs != nil { + in, out := &in.RemoteResourceRefs, &out.RemoteResourceRefs + *out = make([]RemoteResourceRef, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } diff --git a/cmd/eno-reconciler/main.go b/cmd/eno-reconciler/main.go index a71d1ea7..5445211e 100644 --- a/cmd/eno-reconciler/main.go +++ b/cmd/eno-reconciler/main.go @@ -15,8 +15,8 @@ import ( "github.com/Azure/eno/internal/cel" "github.com/Azure/eno/internal/controllers/liveness" - "github.com/Azure/eno/internal/controllers/overlaysync" "github.com/Azure/eno/internal/controllers/reconciliation" + "github.com/Azure/eno/internal/controllers/remotesync" "github.com/Azure/eno/internal/flowcontrol" "github.com/Azure/eno/internal/k8s" "github.com/Azure/eno/internal/logging" @@ -131,11 +131,11 @@ func run() error { return fmt.Errorf("constructing reconciliation controller: %w", err) } - // OverlaySyncController uses remoteConfig (overlay) to watch resources - // and syncs them to InputMirrors on the underlay (mgr's client) - err = overlaysync.NewController(mgr, remoteConfig) + // RemoteSyncController uses remoteConfig to watch resources on the remote cluster + // and syncs them to InputMirrors on the local cluster (mgr's client) + err = remotesync.NewController(mgr, remoteConfig) if err != nil { - return fmt.Errorf("constructing overlay sync controller: %w", err) + return fmt.Errorf("constructing remote sync controller: %w", err) } return mgr.Start(ctx) diff --git a/internal/controllers/overlaysync/controller.go b/internal/controllers/remotesync/controller.go similarity index 76% rename from internal/controllers/overlaysync/controller.go rename to internal/controllers/remotesync/controller.go index afc30641..7eedfe01 100644 --- a/internal/controllers/overlaysync/controller.go +++ b/internal/controllers/remotesync/controller.go @@ -1,20 +1,20 @@ -// Package overlaysync implements the OverlaySyncController which syncs resources -// from overlay clusters to the underlay as InputMirror resources. +// Package remotesync implements the RemoteSyncController which syncs resources +// from remote clusters to the local cluster as InputMirror resources. // // ARCHITECTURE: // This controller runs in the eno-reconciler and uses the reconciler's existing -// overlay client (configured via --remote-kubeconfig) to watch and sync resources: -// 1. The controller is initialized with a pre-configured overlay REST config -// 2. It sets up dynamic informers on the overlay cluster for each Symphony's refs +// remote client (configured via --remote-kubeconfig) to watch and sync resources: +// 1. The controller is initialized with a pre-configured remote REST config +// 2. It sets up dynamic informers on the remote cluster for each Symphony's refs // 3. When a watched resource changes, the informer triggers a reconcile -// 4. The reconciler syncs the changed resource to the corresponding InputMirror on the underlay +// 4. The reconciler syncs the changed resource to the corresponding InputMirror on the local cluster // // SECURITY CONSIDERATIONS: -// - Overlay credentials are managed by the reconciler's --remote-kubeconfig flag +// - Remote credentials are managed by the reconciler's --remote-kubeconfig flag // - No credentials stored in Symphony specs // - REST client has timeouts to prevent resource exhaustion // - Only specified resource types can be synced (no arbitrary access) -package overlaysync +package remotesync import ( "context" @@ -56,21 +56,21 @@ const ( WatchResyncPeriod = 10 * time.Minute // FinalizerName is the finalizer added to InputMirrors - FinalizerName = "eno.azure.io/overlay-sync" + FinalizerName = "eno.azure.io/remote-sync" // Client timeout settings for security - overlayClientTimeout = 30 * time.Second - overlayClientQPS = 5 - overlayClientBurst = 10 + remoteClientTimeout = 30 * time.Second + remoteClientQPS = 5 + remoteClientBurst = 10 - // maxSyncConcurrency limits parallel overlay resource fetches per Symphony. - // This prevents overwhelming the overlay cluster's API server while still + // maxSyncConcurrency limits parallel remote resource fetches per Symphony. + // This prevents overwhelming the remote cluster's API server while still // providing significant speedup over sequential syncing. // With 100 refs at ~50ms each: sequential = 5s, parallel (10) = 500ms maxSyncConcurrency = 10 ) -// AllowedSyncKinds defines which resource kinds can be synced from overlay. +// AllowedSyncKinds defines which resource kinds can be synced from remote. // This is a security control to prevent syncing sensitive resources. var AllowedSyncKinds = map[schema.GroupKind]bool{ {Group: "", Kind: "ConfigMap"}: true, @@ -78,12 +78,12 @@ var AllowedSyncKinds = map[schema.GroupKind]bool{ // Explicitly NOT allowing: Secret, ServiceAccount, etc. } -// overlayWatcher manages watch connections to the overlay cluster. +// remoteWatcher manages watch connections to the remote cluster. // It maintains dynamic informers for each resource type being watched. -type overlayWatcher struct { +type remoteWatcher struct { mu sync.RWMutex - // Dynamic client and informer factory for the overlay cluster + // Dynamic client and informer factory for the remote cluster dynamicClient dynamic.Interface informerFactory dynamicinformer.DynamicSharedInformerFactory stopCh chan struct{} @@ -95,15 +95,15 @@ type overlayWatcher struct { controller *Controller } -// Controller reconciles Symphonies with overlay resource refs, syncing resources -// from overlay clusters to InputMirror resources on the underlay. +// Controller reconciles Symphonies with remote resource refs, syncing resources +// from remote clusters to InputMirror resources on the local cluster. type Controller struct { client client.Client scheme *runtime.Scheme - // overlayWatcher is the shared watcher for the overlay cluster - // initialized once from the overlay REST config passed to NewController - overlayWatcher *overlayWatcher + // remoteWatcher is the shared watcher for the remote cluster + // initialized once from the remote REST config passed to NewController + remoteWatcher *remoteWatcher // allowedKinds can be overridden for testing allowedKinds map[schema.GroupKind]bool @@ -112,10 +112,10 @@ type Controller struct { reconcileQueue workqueue.TypedRateLimitingInterface[ctrl.Request] } -// NewController creates a new OverlaySyncController and registers it with the manager. -// The overlayConfig is the REST config for the overlay cluster (typically from --remote-kubeconfig). -// If overlayConfig is nil, the controller will not sync any resources. -func NewController(mgr ctrl.Manager, overlayConfig *rest.Config) error { +// NewController creates a new RemoteSyncController and registers it with the manager. +// The remoteConfig is the REST config for the remote cluster (typically from --remote-kubeconfig). +// If remoteConfig is nil, the controller will not sync any resources. +func NewController(mgr ctrl.Manager, remoteConfig *rest.Config) error { c := &Controller{ client: mgr.GetClient(), scheme: mgr.GetScheme(), @@ -125,24 +125,24 @@ func NewController(mgr ctrl.Manager, overlayConfig *rest.Config) error { ), } - // Initialize the shared overlay watcher if config is provided - if overlayConfig != nil { - watcher, err := newOverlayWatcher(overlayConfig, c) + // Initialize the shared remote watcher if config is provided + if remoteConfig != nil { + watcher, err := newRemoteWatcher(remoteConfig, c) if err != nil { - return fmt.Errorf("creating overlay watcher: %w", err) + return fmt.Errorf("creating remote watcher: %w", err) } - c.overlayWatcher = watcher + c.remoteWatcher = watcher } return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Symphony{}). Owns(&apiv1.InputMirror{}). - WithLogConstructor(manager.NewLogConstructor(mgr, "overlaySyncController")). + WithLogConstructor(manager.NewLogConstructor(mgr, "remoteSyncController")). Complete(c) } -// newOverlayWatcher creates a new overlay watcher from the given REST config -func newOverlayWatcher(config *rest.Config, controller *Controller) (*overlayWatcher, error) { +// newRemoteWatcher creates a new remote watcher from the given REST config +func newRemoteWatcher(config *rest.Config, controller *Controller) (*remoteWatcher, error) { // Create dynamic client for informers dynamicClient, err := dynamic.NewForConfig(config) if err != nil { @@ -153,7 +153,7 @@ func newOverlayWatcher(config *rest.Config, controller *Controller) (*overlayWat stopCh := make(chan struct{}) informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, WatchResyncPeriod) - return &overlayWatcher{ + return &remoteWatcher{ dynamicClient: dynamicClient, informerFactory: informerFactory, stopCh: stopCh, @@ -165,8 +165,8 @@ func newOverlayWatcher(config *rest.Config, controller *Controller) (*overlayWat func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := logr.FromContextOrDiscard(ctx) - // Skip if no overlay watcher configured (no --remote-kubeconfig) - if c.overlayWatcher == nil { + // Skip if no remote watcher configured (no --remote-kubeconfig) + if c.remoteWatcher == nil { return ctrl.Result{}, nil } @@ -184,8 +184,8 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu ) ctx = logr.NewContext(ctx, logger) - // Skip if no overlay resource refs defined - if len(symphony.Spec.OverlayResourceRefs) == 0 { + // Skip if no remote resource refs defined + if len(symphony.Spec.RemoteResourceRefs) == 0 { return ctrl.Result{}, nil } @@ -196,14 +196,14 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } // Ensure informers are set up for all resource refs - if err := c.overlayWatcher.ensureInformers(ctx, symphony); err != nil { + if err := c.remoteWatcher.ensureInformers(ctx, symphony); err != nil { logger.Error(err, "failed to setup informers") return ctrl.Result{RequeueAfter: time.Minute}, nil } - // Sync all overlay resource refs in parallel with bounded concurrency + // Sync all remote resource refs in parallel with bounded concurrency // This handles the initial sync and any changes detected by watches - c.syncOverlayResourcesParallel(ctx, symphony, c.overlayWatcher) + c.syncRemoteResourcesParallel(ctx, symphony, c.remoteWatcher) // Clean up InputMirrors for refs that no longer exist if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { @@ -215,14 +215,14 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu } // ensureInformers sets up informers for all resource refs in the symphony -func (w *overlayWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Symphony) error { +func (w *remoteWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Symphony) error { logger := logr.FromContextOrDiscard(ctx) w.mu.Lock() defer w.mu.Unlock() // Track which GVRs we need - neededGVRs := make(map[schema.GroupVersionResource][]apiv1.OverlayResourceRef) - for _, ref := range symphony.Spec.OverlayResourceRefs { + neededGVRs := make(map[schema.GroupVersionResource][]apiv1.RemoteResourceRef) + for _, ref := range symphony.Spec.RemoteResourceRefs { gvr := schema.GroupVersionResource{ Group: ref.Resource.Group, Version: ref.Resource.Version, @@ -275,7 +275,7 @@ func (w *overlayWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Sy } // enqueueReconcile checks if the object matches any refs and enqueues a reconcile if so -func (w *overlayWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.Symphony, refs []apiv1.OverlayResourceRef, obj interface{}) { +func (w *remoteWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.Symphony, refs []apiv1.RemoteResourceRef, obj interface{}) { logger := logr.FromContextOrDiscard(ctx) u, ok := obj.(*unstructured.Unstructured) @@ -294,7 +294,7 @@ func (w *overlayWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.S // Check if this object matches any of our refs for _, ref := range refs { if matchesRef(u, ref) { - logger.V(2).Info("overlay resource changed, enqueueing reconcile", + logger.V(2).Info("remote resource changed, enqueueing reconcile", "resource", u.GetName(), "namespace", u.GetNamespace(), "key", ref.Key, @@ -311,8 +311,8 @@ func (w *overlayWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.S } } -// matchesRef checks if an unstructured object matches an overlay resource ref -func matchesRef(obj *unstructured.Unstructured, ref apiv1.OverlayResourceRef) bool { +// matchesRef checks if an unstructured object matches a remote resource ref +func matchesRef(obj *unstructured.Unstructured, ref apiv1.RemoteResourceRef) bool { return obj.GetName() == ref.Resource.Name && obj.GetNamespace() == ref.Resource.Namespace } @@ -329,8 +329,8 @@ func pluralize(kind string) string { return lower + "s" } -// getResource fetches a resource from the overlay cluster using the dynamic client -func (w *overlayWatcher) getResource(ctx context.Context, ref apiv1.OverlayResourceRef) (*unstructured.Unstructured, error) { +// getResource fetches a resource from the remote cluster using the dynamic client +func (w *remoteWatcher) getResource(ctx context.Context, ref apiv1.RemoteResourceRef) (*unstructured.Unstructured, error) { gvr := schema.GroupVersionResource{ Group: ref.Resource.Group, Version: ref.Resource.Version, @@ -349,22 +349,22 @@ func (w *overlayWatcher) getResource(ctx context.Context, ref apiv1.OverlayResou return obj, err } -// syncResult holds the result of syncing a single overlay resource +// syncResult holds the result of syncing a single remote resource type syncResult struct { key string err error } -// syncOverlayResourcesParallel syncs all overlay resource refs in parallel with bounded concurrency. +// syncRemoteResourcesParallel syncs all remote resource refs in parallel with bounded concurrency. // This reduces reconcile latency from O(n * latency) to O(n/concurrency * latency). // For example, with 100 refs at ~50ms each: sequential = 5s, parallel (10) = ~500ms. -func (c *Controller) syncOverlayResourcesParallel( +func (c *Controller) syncRemoteResourcesParallel( ctx context.Context, symphony *apiv1.Symphony, - watcher *overlayWatcher, + watcher *remoteWatcher, ) { logger := logr.FromContextOrDiscard(ctx) - refs := symphony.Spec.OverlayResourceRefs + refs := symphony.Spec.RemoteResourceRefs if len(refs) == 0 { return @@ -390,7 +390,7 @@ func (c *Controller) syncOverlayResourcesParallel( return nil } - err := c.syncOverlayResource(ctx, symphony, watcher, ref) + err := c.syncRemoteResource(ctx, symphony, watcher, ref) results <- syncResult{key: ref.Key, err: err} return nil // Don't propagate errors - we handle them individually }) @@ -405,44 +405,44 @@ func (c *Controller) syncOverlayResourcesParallel( for result := range results { if result.err != nil { - logger.Error(result.err, "failed to sync overlay resource", "key", result.key) + logger.Error(result.err, "failed to sync remote resource", "key", result.key) failCount++ continue } successCount++ } - logger.V(1).Info("completed parallel overlay sync", + logger.V(1).Info("completed parallel remote sync", "total", len(refs), "success", successCount, "failed", failCount, ) } -// syncOverlayResource syncs a single overlay resource to an InputMirror -func (c *Controller) syncOverlayResource( +// syncRemoteResource syncs a single remote resource to an InputMirror +func (c *Controller) syncRemoteResource( ctx context.Context, symphony *apiv1.Symphony, - watcher *overlayWatcher, - ref apiv1.OverlayResourceRef, + watcher *remoteWatcher, + ref apiv1.RemoteResourceRef, ) error { logger := logr.FromContextOrDiscard(ctx).WithValues("key", ref.Key, "resourceName", ref.Resource.Name) // SECURITY: Validate the resource kind is allowed to be synced gk := schema.GroupKind{Group: ref.Resource.Group, Kind: ref.Resource.Kind} if !c.allowedKinds[gk] { - return fmt.Errorf("security: resource kind %q is not allowed to be synced from overlay", gk.String()) + return fmt.Errorf("security: resource kind %q is not allowed to be synced from remote", gk.String()) } - // Fetch from overlay using the watcher's dynamic client + // Fetch from remote using the watcher's dynamic client obj, err := watcher.getResource(ctx, ref) if err != nil { if errors.IsNotFound(err) && ref.Optional { - logger.V(1).Info("optional overlay resource not found, skipping") + logger.V(1).Info("optional remote resource not found, skipping") // Update InputMirror to reflect missing state return c.updateMirrorMissing(ctx, symphony, ref) } - return fmt.Errorf("getting overlay resource: %w", err) + return fmt.Errorf("getting remote resource: %w", err) } // Create/Update InputMirror @@ -482,13 +482,13 @@ func (c *Controller) syncOverlayResource( mirror.Status.SyncGeneration = obj.GetResourceVersion() // Update conditions - setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from overlay cluster") + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from remote cluster") if err := c.client.Status().Update(ctx, mirror); err != nil { return fmt.Errorf("updating InputMirror status: %w", err) } - logger.V(1).Info("synced overlay resource", "result", result, "mirrorName", mirrorName) + logger.V(1).Info("synced remote resource", "result", result, "mirrorName", mirrorName) return nil } @@ -496,7 +496,7 @@ func (c *Controller) syncOverlayResource( func (c *Controller) updateMirrorMissing( ctx context.Context, symphony *apiv1.Symphony, - ref apiv1.OverlayResourceRef, + ref apiv1.RemoteResourceRef, ) error { mirrorName := inputMirrorName(symphony.Name, ref.Key) mirror := &apiv1.InputMirror{} @@ -510,7 +510,7 @@ func (c *Controller) updateMirrorMissing( } // Update condition to reflect missing state - setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in overlay") + setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in remote cluster") mirror.Status.Data = nil return c.client.Status().Update(ctx, mirror) @@ -534,7 +534,7 @@ func (c *Controller) cleanupOrphanedMirrors(ctx context.Context, symphony *apiv1 // Build set of expected mirror names expected := make(map[string]struct{}) - for _, ref := range symphony.Spec.OverlayResourceRefs { + for _, ref := range symphony.Spec.RemoteResourceRefs { expected[inputMirrorName(symphony.Name, ref.Key)] = struct{}{} } diff --git a/internal/controllers/overlaysync/controller_test.go b/internal/controllers/remotesync/controller_test.go similarity index 90% rename from internal/controllers/overlaysync/controller_test.go rename to internal/controllers/remotesync/controller_test.go index ccaff8f2..fcc8e128 100644 --- a/internal/controllers/overlaysync/controller_test.go +++ b/internal/controllers/remotesync/controller_test.go @@ -1,4 +1,4 @@ -package overlaysync +package remotesync import ( "context" @@ -59,7 +59,7 @@ func TestSetSyncedCondition(t *testing.T) { assert.Equal(t, "SyncFailed", mirror.Status.Conditions[0].Reason) } -func TestReconcile_NoOverlayRefs(t *testing.T) { +func TestReconcile_NoRemoteRefs(t *testing.T) { scheme := runtime.NewScheme() require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) require.NoError(t, corev1.AddToScheme(scheme)) @@ -70,7 +70,7 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { Namespace: "test-ns", }, Spec: apiv1.SymphonySpec{ - // No OverlayResourceRefs + // No RemoteResourceRefs }, } @@ -83,7 +83,7 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { client: client, scheme: scheme, allowedKinds: AllowedSyncKinds, - // overlayWatcher nil - simulates no overlay config provided + // remoteWatcher nil - simulates no remote config provided } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -97,7 +97,7 @@ func TestReconcile_NoOverlayRefs(t *testing.T) { assert.Equal(t, reconcile.Result{}, result) } -func TestReconcile_NoOverlayWatcher(t *testing.T) { +func TestReconcile_NoRemoteWatcher(t *testing.T) { scheme := runtime.NewScheme() require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) require.NoError(t, corev1.AddToScheme(scheme)) @@ -108,10 +108,10 @@ func TestReconcile_NoOverlayWatcher(t *testing.T) { Namespace: "test-ns", }, Spec: apiv1.SymphonySpec{ - OverlayResourceRefs: []apiv1.OverlayResourceRef{ + RemoteResourceRefs: []apiv1.RemoteResourceRef{ { Key: "test", - Resource: apiv1.OverlayResourceSelector{ + Resource: apiv1.RemoteResourceSelector{ Kind: "ConfigMap", Version: "v1", Name: "test-cm", @@ -130,7 +130,7 @@ func TestReconcile_NoOverlayWatcher(t *testing.T) { client: client, scheme: scheme, allowedKinds: AllowedSyncKinds, - // overlayWatcher nil - no overlay config provided to eno-reconciler + // remoteWatcher nil - no remote config provided to eno-reconciler } result, err := controller.Reconcile(context.Background(), reconcile.Request{ @@ -141,7 +141,7 @@ func TestReconcile_NoOverlayWatcher(t *testing.T) { }) require.NoError(t, err) - // Should return empty result since no overlay watcher available + // Should return empty result since no remote watcher available assert.Equal(t, reconcile.Result{}, result) } From 14bf1c78e684ee98ee8ca2626596f1980761b365 Mon Sep 17 00:00:00 2001 From: david kydd Date: Tue, 30 Dec 2025 08:01:13 +1300 Subject: [PATCH 8/9] fix: properly lowercase kind names in pluralize function The pluralize function was only lowercasing the first letter of kind names, resulting in resource names like 'configMaps' instead of 'configmaps'. This caused the dynamic client to fail with 'server could not find the requested resource' because Kubernetes expects fully lowercased resource names. The fix uses strings.ToLower to properly convert the entire kind name before pluralizing. --- internal/controllers/remotesync/controller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/controllers/remotesync/controller.go b/internal/controllers/remotesync/controller.go index 7eedfe01..f4daa4b6 100644 --- a/internal/controllers/remotesync/controller.go +++ b/internal/controllers/remotesync/controller.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync" "time" @@ -319,7 +320,8 @@ func matchesRef(obj *unstructured.Unstructured, ref apiv1.RemoteResourceRef) boo // pluralize converts a Kind to its plural resource name (simple heuristic) func pluralize(kind string) string { - lower := string(kind[0]+32) + kind[1:] // lowercase first letter + // Use strings.ToLower to properly lowercase the entire string + lower := strings.ToLower(kind) if lower[len(lower)-1] == 's' { return lower + "es" } From db7b4d961afb2d8342e8c9815fbfdd839d9d43ba Mon Sep 17 00:00:00 2001 From: david kydd Date: Tue, 30 Dec 2025 12:08:08 +1300 Subject: [PATCH 9/9] debug for eno-reconciler watch based mechanism --- .../config/crd/eno.azure.io_compositions.yaml | 2 +- .../config/crd/eno.azure.io_inputmirrors.yaml | 59 +++++++++------ .../crd/eno.azure.io_resourceslices.yaml | 2 +- .../config/crd/eno.azure.io_symphonies.yaml | 41 +++-------- .../config/crd/eno.azure.io_synthesizers.yaml | 2 +- api/v1/zz_generated.deepcopy.go | 72 +++++++++---------- internal/controllers/remotesync/controller.go | 36 ++++++---- 7 files changed, 105 insertions(+), 109 deletions(-) diff --git a/api/v1/config/crd/eno.azure.io_compositions.yaml b/api/v1/config/crd/eno.azure.io_compositions.yaml index c6fc4d42..f2760304 100644 --- a/api/v1/config/crd/eno.azure.io_compositions.yaml +++ b/api/v1/config/crd/eno.azure.io_compositions.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: compositions.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/config/crd/eno.azure.io_inputmirrors.yaml b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml index 7ec44326..2a2ccff9 100644 --- a/api/v1/config/crd/eno.azure.io_inputmirrors.yaml +++ b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: inputmirrors.eno.azure.io spec: group: eno.azure.io @@ -28,8 +28,8 @@ spec: schema: openAPIV3Schema: description: |- - InputMirror stores a copy of a resource from an overlay cluster. - It is created and managed by the OverlaySyncController based on Symphony.spec.overlayResourceRefs. + InputMirror stores a copy of a resource from a remote cluster. + It is created and managed by the RemoteSyncController based on Symphony.spec.remoteResourceRefs. Compositions can bind to InputMirrors just like any other resource. properties: apiVersion: @@ -37,6 +37,7 @@ spec: APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources type: string kind: description: |- @@ -44,6 +45,7 @@ spec: Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds type: string metadata: type: object @@ -52,18 +54,9 @@ spec: key: description: Key matches the Symphony's overlayResourceRef key type: string - symphonyRef: - description: SymphonyRef points to the owning Symphony - properties: - name: - description: Name of the referent. - type: string - required: - - name - type: object sourceResource: - description: SourceResource describes what resource to sync from - the overlay + description: SourceResource describes what resource to sync from the + remote cluster properties: group: description: API Group of the resource (empty string for core @@ -87,6 +80,20 @@ spec: - name - version type: object + symphonyRef: + description: SymphonyRef points to the owning Symphony + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + type: object + x-kubernetes-map-type: atomic required: - key - sourceResource @@ -101,24 +108,32 @@ spec: state of this API Resource. properties: lastTransitionTime: - description: lastTransitionTime is the last time the condition - transitioned from one status to another. + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. format: date-time type: string message: - description: message is a human readable message indicating - details about the transition. + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. maxLength: 32768 type: string observedGeneration: - description: observedGeneration represents the .metadata.generation - that the condition was set based upon. + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. format: int64 minimum: 0 type: integer reason: - description: reason contains a programmatic identifier indicating - the reason for the condition's last transition. + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. maxLength: 1024 minLength: 1 pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ diff --git a/api/v1/config/crd/eno.azure.io_resourceslices.yaml b/api/v1/config/crd/eno.azure.io_resourceslices.yaml index 6e30679f..403f3495 100644 --- a/api/v1/config/crd/eno.azure.io_resourceslices.yaml +++ b/api/v1/config/crd/eno.azure.io_resourceslices.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: resourceslices.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/config/crd/eno.azure.io_symphonies.yaml b/api/v1/config/crd/eno.azure.io_symphonies.yaml index 785aeef3..96a222f7 100644 --- a/api/v1/config/crd/eno.azure.io_symphonies.yaml +++ b/api/v1/config/crd/eno.azure.io_symphonies.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: symphonies.eno.azure.io spec: group: eno.azure.io @@ -72,39 +72,14 @@ spec: - resource type: object type: array - overlayCredentials: + remoteResourceRefs: description: |- - OverlayCredentials specifies how to access the overlay cluster. - When set, the OverlaySyncController will use these credentials to sync - resources specified in OverlayResourceRefs. - properties: - key: - default: kubeconfig - description: Key within the secret containing the kubeconfig data - type: string - secretRef: - description: SecretRef references a Secret containing the kubeconfig - for the overlay cluster - properties: - name: - description: name is unique within a namespace to reference - a secret resource. - type: string - namespace: - description: namespace defines the space within which the - secret name must be unique. - type: string - type: object - required: - - secretRef - type: object - overlayResourceRefs: - description: |- - OverlayResourceRefs specifies resources to sync from the overlay cluster. + RemoteResourceRefs specifies resources to sync from the remote cluster. Each ref results in an InputMirror being created that can be bound as an input. + The remote cluster is accessed via the eno-reconciler's --remote-kubeconfig. items: - description: OverlayResourceRef defines a resource to sync from - an overlay cluster + description: RemoteResourceRef defines a resource to sync from a + remote cluster properties: key: description: |- @@ -114,10 +89,10 @@ spec: optional: default: false description: Optional indicates that synthesis can proceed if - this resource doesn't exist in the overlay. + this resource doesn't exist in the remote cluster. type: boolean resource: - description: Resource specifies what to fetch from the overlay + description: Resource specifies what to fetch from the remote cluster properties: group: diff --git a/api/v1/config/crd/eno.azure.io_synthesizers.yaml b/api/v1/config/crd/eno.azure.io_synthesizers.yaml index 28375a12..d9168d12 100644 --- a/api/v1/config/crd/eno.azure.io_synthesizers.yaml +++ b/api/v1/config/crd/eno.azure.io_synthesizers.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: synthesizers.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index a8bc459e..fcb17af9 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -352,42 +352,6 @@ func (in *Manifest) DeepCopy() *Manifest { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *RemoteResourceRef) DeepCopyInto(out *RemoteResourceRef) { - *out = *in - out.Resource = in.Resource - if in.SyncInterval != nil { - in, out := &in.SyncInterval, &out.SyncInterval - *out = new(metav1.Duration) - **out = **in - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceRef. -func (in *RemoteResourceRef) DeepCopy() *RemoteResourceRef { - if in == nil { - return nil - } - out := new(RemoteResourceRef) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *RemoteResourceSelector) DeepCopyInto(out *RemoteResourceSelector) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceSelector. -func (in *RemoteResourceSelector) DeepCopy() *RemoteResourceSelector { - if in == nil { - return nil - } - out := new(RemoteResourceSelector) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PodOverrides) DeepCopyInto(out *PodOverrides) { *out = *in @@ -439,6 +403,42 @@ func (in *Ref) DeepCopy() *Ref { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceRef) DeepCopyInto(out *RemoteResourceRef) { + *out = *in + out.Resource = in.Resource + if in.SyncInterval != nil { + in, out := &in.SyncInterval, &out.SyncInterval + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceRef. +func (in *RemoteResourceRef) DeepCopy() *RemoteResourceRef { + if in == nil { + return nil + } + out := new(RemoteResourceRef) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceSelector) DeepCopyInto(out *RemoteResourceSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceSelector. +func (in *RemoteResourceSelector) DeepCopy() *RemoteResourceSelector { + if in == nil { + return nil + } + out := new(RemoteResourceSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceBinding) DeepCopyInto(out *ResourceBinding) { *out = *in diff --git a/internal/controllers/remotesync/controller.go b/internal/controllers/remotesync/controller.go index f4daa4b6..54266956 100644 --- a/internal/controllers/remotesync/controller.go +++ b/internal/controllers/remotesync/controller.go @@ -39,10 +39,12 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -109,21 +111,22 @@ type Controller struct { // allowedKinds can be overridden for testing allowedKinds map[schema.GroupKind]bool - // reconcileQueue is used by informer callbacks to enqueue Symphony reconciles - reconcileQueue workqueue.TypedRateLimitingInterface[ctrl.Request] + // eventChan receives events from informer callbacks to trigger reconciles + eventChan chan event.TypedGenericEvent[*apiv1.Symphony] } // NewController creates a new RemoteSyncController and registers it with the manager. // The remoteConfig is the REST config for the remote cluster (typically from --remote-kubeconfig). // If remoteConfig is nil, the controller will not sync any resources. func NewController(mgr ctrl.Manager, remoteConfig *rest.Config) error { + // Create buffered event channel to receive watch events from informers + eventChan := make(chan event.TypedGenericEvent[*apiv1.Symphony], 100) + c := &Controller{ client: mgr.GetClient(), scheme: mgr.GetScheme(), allowedKinds: AllowedSyncKinds, - reconcileQueue: workqueue.NewTypedRateLimitingQueue( - workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](), - ), + eventChan: eventChan, } // Initialize the shared remote watcher if config is provided @@ -138,6 +141,7 @@ func NewController(mgr ctrl.Manager, remoteConfig *rest.Config) error { return ctrl.NewControllerManagedBy(mgr). For(&apiv1.Symphony{}). Owns(&apiv1.InputMirror{}). + WatchesRawSource(source.Channel(eventChan, &handler.TypedEnqueueRequestForObject[*apiv1.Symphony]{})). WithLogConstructor(manager.NewLogConstructor(mgr, "remoteSyncController")). Complete(c) } @@ -295,18 +299,20 @@ func (w *remoteWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.Sy // Check if this object matches any of our refs for _, ref := range refs { if matchesRef(u, ref) { - logger.V(2).Info("remote resource changed, enqueueing reconcile", + logger.V(1).Info("remote resource changed, enqueueing reconcile", "resource", u.GetName(), "namespace", u.GetNamespace(), "key", ref.Key, ) - // Enqueue the symphony for reconcile - w.controller.reconcileQueue.Add(ctrl.Request{ - NamespacedName: types.NamespacedName{ - Name: symphony.Name, - Namespace: symphony.Namespace, - }, - }) + // Send event through the channel to trigger reconcile + // Use non-blocking send to avoid blocking informer callbacks + select { + case w.controller.eventChan <- event.TypedGenericEvent[*apiv1.Symphony]{ + Object: symphony, + }: + default: + logger.V(1).Info("event channel full, reconcile will happen on next poll") + } return } } @@ -320,7 +326,7 @@ func matchesRef(obj *unstructured.Unstructured, ref apiv1.RemoteResourceRef) boo // pluralize converts a Kind to its plural resource name (simple heuristic) func pluralize(kind string) string { - // Use strings.ToLower to properly lowercase the entire string + // Use strings.ToLower to properly lowercase the entire string (first letter only is insufficient for multi-word kinds like configMap) lower := strings.ToLower(kind) if lower[len(lower)-1] == 's' { return lower + "es"