diff --git a/api/v1/config/crd/eno.azure.io_compositions.yaml b/api/v1/config/crd/eno.azure.io_compositions.yaml index c6fc4d42..f2760304 100644 --- a/api/v1/config/crd/eno.azure.io_compositions.yaml +++ b/api/v1/config/crd/eno.azure.io_compositions.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: compositions.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/config/crd/eno.azure.io_inputmirrors.yaml b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml new file mode 100644 index 00000000..2a2ccff9 --- /dev/null +++ b/api/v1/config/crd/eno.azure.io_inputmirrors.yaml @@ -0,0 +1,183 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.20.0 + name: inputmirrors.eno.azure.io +spec: + group: eno.azure.io + names: + kind: InputMirror + listKind: InputMirrorList + plural: inputmirrors + singular: inputmirror + scope: Namespaced + versions: + - additionalPrinterColumns: + - jsonPath: .spec.sourceResource.name + name: Source + type: string + - jsonPath: .status.conditions[?(@.type=="Synced")].status + name: Synced + type: string + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1 + schema: + openAPIV3Schema: + description: |- + InputMirror stores a copy of a resource from a remote cluster. + It is created and managed by the RemoteSyncController based on Symphony.spec.remoteResourceRefs. + Compositions can bind to InputMirrors just like any other resource. + properties: + apiVersion: + description: |- + APIVersion defines the versioned schema of this representation of an object. + Servers should convert recognized schemas to the latest internal value, and + may reject unrecognized values. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources + type: string + kind: + description: |- + Kind is a string value representing the REST resource this object represents. + Servers may infer this from the endpoint the client submits requests to. + Cannot be updated. + In CamelCase. + More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds + type: string + metadata: + type: object + spec: + properties: + key: + description: Key matches the Symphony's overlayResourceRef key + type: string + sourceResource: + description: SourceResource describes what resource to sync from the + remote cluster + properties: + group: + description: API Group of the resource (empty string for core + API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + symphonyRef: + description: SymphonyRef points to the owning Symphony + properties: + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + type: object + x-kubernetes-map-type: atomic + required: + - key + - sourceResource + - symphonyRef + type: object + status: + properties: + conditions: + description: Conditions describe the sync state + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array + x-kubernetes-list-map-keys: + - type + x-kubernetes-list-type: map + data: + description: |- + Data contains the actual resource data from the overlay cluster. + This is the full resource serialized as JSON. + type: object + x-kubernetes-preserve-unknown-fields: true + lastSyncTime: + description: LastSyncTime records when the resource was last successfully + synced + format: date-time + type: string + syncGeneration: + description: SyncGeneration tracks the source resource's resourceVersion + type: string + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/api/v1/config/crd/eno.azure.io_resourceslices.yaml b/api/v1/config/crd/eno.azure.io_resourceslices.yaml index 6e30679f..403f3495 100644 --- a/api/v1/config/crd/eno.azure.io_resourceslices.yaml +++ b/api/v1/config/crd/eno.azure.io_resourceslices.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: resourceslices.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/config/crd/eno.azure.io_symphonies.yaml b/api/v1/config/crd/eno.azure.io_symphonies.yaml index 7242466c..96a222f7 100644 --- a/api/v1/config/crd/eno.azure.io_symphonies.yaml +++ b/api/v1/config/crd/eno.azure.io_symphonies.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: symphonies.eno.azure.io spec: group: eno.azure.io @@ -72,6 +72,62 @@ spec: - resource type: object type: array + remoteResourceRefs: + description: |- + RemoteResourceRefs specifies resources to sync from the remote cluster. + Each ref results in an InputMirror being created that can be bound as an input. + The remote cluster is accessed via the eno-reconciler's --remote-kubeconfig. + items: + description: RemoteResourceRef defines a resource to sync from a + remote cluster + properties: + key: + description: |- + Key that will be used to reference this input in Composition bindings. + This key maps to an auto-created InputMirror resource. + type: string + optional: + default: false + description: Optional indicates that synthesis can proceed if + this resource doesn't exist in the remote cluster. + type: boolean + resource: + description: Resource specifies what to fetch from the remote + cluster + properties: + group: + description: API Group of the resource (empty string for + core API group) + type: string + kind: + description: Kind of the resource (e.g., ConfigMap, Secret) + type: string + name: + description: Name of the resource + type: string + namespace: + description: Namespace of the resource (empty for cluster-scoped + resources) + type: string + version: + description: API Version of the resource + type: string + required: + - kind + - name + - version + type: object + syncInterval: + default: 5m + description: SyncInterval determines how often to re-sync the + resource. + type: string + required: + - key + - resource + type: object + maxItems: 20 + type: array synthesisEnv: description: |- SynthesisEnv diff --git a/api/v1/config/crd/eno.azure.io_synthesizers.yaml b/api/v1/config/crd/eno.azure.io_synthesizers.yaml index 28375a12..d9168d12 100644 --- a/api/v1/config/crd/eno.azure.io_synthesizers.yaml +++ b/api/v1/config/crd/eno.azure.io_synthesizers.yaml @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.18.0 + controller-gen.kubebuilder.io/version: v0.20.0 name: synthesizers.eno.azure.io spec: group: eno.azure.io diff --git a/api/v1/inputmirror.go b/api/v1/inputmirror.go new file mode 100644 index 00000000..aa9c8377 --- /dev/null +++ b/api/v1/inputmirror.go @@ -0,0 +1,100 @@ +package v1 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// +kubebuilder:object:root=true +type InputMirrorList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []InputMirror `json:"items"` +} + +// InputMirror stores a copy of a resource from a remote cluster. +// It is created and managed by the RemoteSyncController based on Symphony.spec.remoteResourceRefs. +// Compositions can bind to InputMirrors just like any other resource. +// +// +kubebuilder:object:root=true +// +kubebuilder:subresource:status +// +kubebuilder:printcolumn:name="Source",type=string,JSONPath=`.spec.sourceResource.name` +// +kubebuilder:printcolumn:name="Synced",type=string,JSONPath=`.status.conditions[?(@.type=="Synced")].status` +// +kubebuilder:printcolumn:name="Age",type=date,JSONPath=`.metadata.creationTimestamp` +type InputMirror struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec InputMirrorSpec `json:"spec,omitempty"` + Status InputMirrorStatus `json:"status,omitempty"` +} + +type InputMirrorSpec struct { + // Key matches the Symphony's overlayResourceRef key + Key string `json:"key"` + + // SymphonyRef points to the owning Symphony + SymphonyRef corev1.LocalObjectReference `json:"symphonyRef"` + + // SourceResource describes what resource to sync from the remote cluster + SourceResource RemoteResourceSelector `json:"sourceResource"` +} + +type InputMirrorStatus struct { + // Data contains the actual resource data from the overlay cluster. + // This is the full resource serialized as JSON. + // +kubebuilder:pruning:PreserveUnknownFields + Data *runtime.RawExtension `json:"data,omitempty"` + + // LastSyncTime records when the resource was last successfully synced + LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"` + + // SyncGeneration tracks the source resource's resourceVersion + SyncGeneration string `json:"syncGeneration,omitempty"` + + // Conditions describe the sync state + // +listType=map + // +listMapKey=type + Conditions []metav1.Condition `json:"conditions,omitempty"` +} + +// RemoteResourceSelector describes a resource to sync from a remote cluster +type RemoteResourceSelector struct { + // API Group of the resource (empty string for core API group) + // +optional + Group string `json:"group,omitempty"` + + // API Version of the resource + Version string `json:"version"` + + // Kind of the resource (e.g., ConfigMap, Secret) + Kind string `json:"kind"` + + // Name of the resource + Name string `json:"name"` + + // Namespace of the resource (empty for cluster-scoped resources) + // +optional + Namespace string `json:"namespace,omitempty"` +} + +// RemoteResourceRef defines a resource to sync from a remote cluster +type RemoteResourceRef struct { + // Key that will be used to reference this input in Composition bindings. + // This key maps to an auto-created InputMirror resource. + Key string `json:"key"` + + // Resource specifies what to fetch from the remote cluster + Resource RemoteResourceSelector `json:"resource"` + + // SyncInterval determines how often to re-sync the resource. + // +kubebuilder:default="5m" + // +optional + SyncInterval *metav1.Duration `json:"syncInterval,omitempty"` + + // Optional indicates that synthesis can proceed if this resource doesn't exist in the remote cluster. + // +kubebuilder:default=false + // +optional + Optional bool `json:"optional,omitempty"` +} diff --git a/api/v1/symphony.go b/api/v1/symphony.go index 83cc27ef..66f61f12 100644 --- a/api/v1/symphony.go +++ b/api/v1/symphony.go @@ -39,6 +39,13 @@ type SymphonySpec struct { // Copied opaquely into the compositions managed by this symphony. // +kubebuilder:validation:MaxItems:=50 SynthesisEnv []EnvVar `json:"synthesisEnv,omitempty"` // deprecated synthesis env should always be variation scoped. + + // RemoteResourceRefs specifies resources to sync from the remote cluster. + // Each ref results in an InputMirror being created that can be bound as an input. + // The remote cluster is accessed via the eno-reconciler's --remote-kubeconfig. + // +optional + // +kubebuilder:validation:MaxItems:=20 + RemoteResourceRefs []RemoteResourceRef `json:"remoteResourceRefs,omitempty"` } type SymphonyStatus struct { diff --git a/api/v1/types.go b/api/v1/types.go index e5fdb085..20e69935 100644 --- a/api/v1/types.go +++ b/api/v1/types.go @@ -19,4 +19,5 @@ func init() { SchemeBuilder.Register(&CompositionList{}, &Composition{}) SchemeBuilder.Register(&SymphonyList{}, &Symphony{}) SchemeBuilder.Register(&ResourceSliceList{}, &ResourceSlice{}) + SchemeBuilder.Register(&InputMirrorList{}, &InputMirror{}) } diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 5a8d0e02..fcb17af9 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -7,7 +7,7 @@ package v1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -185,6 +185,113 @@ func (in *Input) DeepCopy() *Input { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirror) DeepCopyInto(out *InputMirror) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirror. +func (in *InputMirror) DeepCopy() *InputMirror { + if in == nil { + return nil + } + out := new(InputMirror) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirror) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorList) DeepCopyInto(out *InputMirrorList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]InputMirror, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorList. +func (in *InputMirrorList) DeepCopy() *InputMirrorList { + if in == nil { + return nil + } + out := new(InputMirrorList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *InputMirrorList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorSpec) DeepCopyInto(out *InputMirrorSpec) { + *out = *in + out.SymphonyRef = in.SymphonyRef + out.SourceResource = in.SourceResource +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorSpec. +func (in *InputMirrorSpec) DeepCopy() *InputMirrorSpec { + if in == nil { + return nil + } + out := new(InputMirrorSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *InputMirrorStatus) DeepCopyInto(out *InputMirrorStatus) { + *out = *in + if in.Data != nil { + in, out := &in.Data, &out.Data + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.LastSyncTime != nil { + in, out := &in.LastSyncTime, &out.LastSyncTime + *out = (*in).DeepCopy() + } + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InputMirrorStatus. +func (in *InputMirrorStatus) DeepCopy() *InputMirrorStatus { + if in == nil { + return nil + } + out := new(InputMirrorStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InputResource) DeepCopyInto(out *InputResource) { *out = *in @@ -296,6 +403,42 @@ func (in *Ref) DeepCopy() *Ref { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceRef) DeepCopyInto(out *RemoteResourceRef) { + *out = *in + out.Resource = in.Resource + if in.SyncInterval != nil { + in, out := &in.SyncInterval, &out.SyncInterval + *out = new(metav1.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceRef. +func (in *RemoteResourceRef) DeepCopy() *RemoteResourceRef { + if in == nil { + return nil + } + out := new(RemoteResourceRef) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RemoteResourceSelector) DeepCopyInto(out *RemoteResourceSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteResourceSelector. +func (in *RemoteResourceSelector) DeepCopy() *RemoteResourceSelector { + if in == nil { + return nil + } + out := new(RemoteResourceSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceBinding) DeepCopyInto(out *ResourceBinding) { *out = *in @@ -582,6 +725,13 @@ func (in *SymphonySpec) DeepCopyInto(out *SymphonySpec) { *out = make([]EnvVar, len(*in)) copy(*out, *in) } + if in.RemoteResourceRefs != nil { + in, out := &in.RemoteResourceRefs, &out.RemoteResourceRefs + *out = make([]RemoteResourceRef, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SymphonySpec. diff --git a/cmd/eno-reconciler/main.go b/cmd/eno-reconciler/main.go index dcf411d1..5445211e 100644 --- a/cmd/eno-reconciler/main.go +++ b/cmd/eno-reconciler/main.go @@ -16,6 +16,7 @@ import ( "github.com/Azure/eno/internal/cel" "github.com/Azure/eno/internal/controllers/liveness" "github.com/Azure/eno/internal/controllers/reconciliation" + "github.com/Azure/eno/internal/controllers/remotesync" "github.com/Azure/eno/internal/flowcontrol" "github.com/Azure/eno/internal/k8s" "github.com/Azure/eno/internal/logging" @@ -130,5 +131,12 @@ func run() error { return fmt.Errorf("constructing reconciliation controller: %w", err) } + // RemoteSyncController uses remoteConfig to watch resources on the remote cluster + // and syncs them to InputMirrors on the local cluster (mgr's client) + err = remotesync.NewController(mgr, remoteConfig) + if err != nil { + return fmt.Errorf("constructing remote sync controller: %w", err) + } + return mgr.Start(ctx) } diff --git a/internal/controllers/remotesync/controller.go b/internal/controllers/remotesync/controller.go new file mode 100644 index 00000000..54266956 --- /dev/null +++ b/internal/controllers/remotesync/controller.go @@ -0,0 +1,603 @@ +// Package remotesync implements the RemoteSyncController which syncs resources +// from remote clusters to the local cluster as InputMirror resources. +// +// ARCHITECTURE: +// This controller runs in the eno-reconciler and uses the reconciler's existing +// remote client (configured via --remote-kubeconfig) to watch and sync resources: +// 1. The controller is initialized with a pre-configured remote REST config +// 2. It sets up dynamic informers on the remote cluster for each Symphony's refs +// 3. When a watched resource changes, the informer triggers a reconcile +// 4. The reconciler syncs the changed resource to the corresponding InputMirror on the local cluster +// +// SECURITY CONSIDERATIONS: +// - Remote credentials are managed by the reconciler's --remote-kubeconfig flag +// - No credentials stored in Symphony specs +// - REST client has timeouts to prevent resource exhaustion +// - Only specified resource types can be synced (no arbitrary access) +package remotesync + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/Azure/eno/internal/manager" + "github.com/go-logr/logr" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // ConditionTypeSynced indicates whether the InputMirror has been successfully synced + ConditionTypeSynced = "Synced" + + // FallbackSyncInterval is used when watches fail or as a safety net for missed events. + // Watches provide real-time updates, so this is only a fallback. + FallbackSyncInterval = 30 * time.Minute + + // WatchResyncPeriod is how often the informer re-lists all objects as a consistency check + WatchResyncPeriod = 10 * time.Minute + + // FinalizerName is the finalizer added to InputMirrors + FinalizerName = "eno.azure.io/remote-sync" + + // Client timeout settings for security + remoteClientTimeout = 30 * time.Second + remoteClientQPS = 5 + remoteClientBurst = 10 + + // maxSyncConcurrency limits parallel remote resource fetches per Symphony. + // This prevents overwhelming the remote cluster's API server while still + // providing significant speedup over sequential syncing. + // With 100 refs at ~50ms each: sequential = 5s, parallel (10) = 500ms + maxSyncConcurrency = 10 +) + +// AllowedSyncKinds defines which resource kinds can be synced from remote. +// This is a security control to prevent syncing sensitive resources. +var AllowedSyncKinds = map[schema.GroupKind]bool{ + {Group: "", Kind: "ConfigMap"}: true, + // Add other allowed kinds here as needed + // Explicitly NOT allowing: Secret, ServiceAccount, etc. +} + +// remoteWatcher manages watch connections to the remote cluster. +// It maintains dynamic informers for each resource type being watched. +type remoteWatcher struct { + mu sync.RWMutex + + // Dynamic client and informer factory for the remote cluster + dynamicClient dynamic.Interface + informerFactory dynamicinformer.DynamicSharedInformerFactory + stopCh chan struct{} + + // Track which GVRs we're watching + watchedGVRs map[schema.GroupVersionResource]struct{} + + // Reference to the controller for enqueuing reconciles + controller *Controller +} + +// Controller reconciles Symphonies with remote resource refs, syncing resources +// from remote clusters to InputMirror resources on the local cluster. +type Controller struct { + client client.Client + scheme *runtime.Scheme + + // remoteWatcher is the shared watcher for the remote cluster + // initialized once from the remote REST config passed to NewController + remoteWatcher *remoteWatcher + + // allowedKinds can be overridden for testing + allowedKinds map[schema.GroupKind]bool + + // eventChan receives events from informer callbacks to trigger reconciles + eventChan chan event.TypedGenericEvent[*apiv1.Symphony] +} + +// NewController creates a new RemoteSyncController and registers it with the manager. +// The remoteConfig is the REST config for the remote cluster (typically from --remote-kubeconfig). +// If remoteConfig is nil, the controller will not sync any resources. +func NewController(mgr ctrl.Manager, remoteConfig *rest.Config) error { + // Create buffered event channel to receive watch events from informers + eventChan := make(chan event.TypedGenericEvent[*apiv1.Symphony], 100) + + c := &Controller{ + client: mgr.GetClient(), + scheme: mgr.GetScheme(), + allowedKinds: AllowedSyncKinds, + eventChan: eventChan, + } + + // Initialize the shared remote watcher if config is provided + if remoteConfig != nil { + watcher, err := newRemoteWatcher(remoteConfig, c) + if err != nil { + return fmt.Errorf("creating remote watcher: %w", err) + } + c.remoteWatcher = watcher + } + + return ctrl.NewControllerManagedBy(mgr). + For(&apiv1.Symphony{}). + Owns(&apiv1.InputMirror{}). + WatchesRawSource(source.Channel(eventChan, &handler.TypedEnqueueRequestForObject[*apiv1.Symphony]{})). + WithLogConstructor(manager.NewLogConstructor(mgr, "remoteSyncController")). + Complete(c) +} + +// newRemoteWatcher creates a new remote watcher from the given REST config +func newRemoteWatcher(config *rest.Config, controller *Controller) (*remoteWatcher, error) { + // Create dynamic client for informers + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, fmt.Errorf("creating dynamic client: %w", err) + } + + // Create informer factory + stopCh := make(chan struct{}) + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, WatchResyncPeriod) + + return &remoteWatcher{ + dynamicClient: dynamicClient, + informerFactory: informerFactory, + stopCh: stopCh, + watchedGVRs: make(map[schema.GroupVersionResource]struct{}), + controller: controller, + }, nil +} + +func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := logr.FromContextOrDiscard(ctx) + + // Skip if no remote watcher configured (no --remote-kubeconfig) + if c.remoteWatcher == nil { + return ctrl.Result{}, nil + } + + symphony := &apiv1.Symphony{} + if err := c.client.Get(ctx, req.NamespacedName, symphony); err != nil { + if errors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + logger = logger.WithValues( + "symphonyName", symphony.Name, + "symphonyNamespace", symphony.Namespace, + ) + ctx = logr.NewContext(ctx, logger) + + // Skip if no remote resource refs defined + if len(symphony.Spec.RemoteResourceRefs) == 0 { + return ctrl.Result{}, nil + } + + // Handle deletion + if symphony.DeletionTimestamp != nil { + // InputMirrors will be garbage collected via owner references + return ctrl.Result{}, nil + } + + // Ensure informers are set up for all resource refs + if err := c.remoteWatcher.ensureInformers(ctx, symphony); err != nil { + logger.Error(err, "failed to setup informers") + return ctrl.Result{RequeueAfter: time.Minute}, nil + } + + // Sync all remote resource refs in parallel with bounded concurrency + // This handles the initial sync and any changes detected by watches + c.syncRemoteResourcesParallel(ctx, symphony, c.remoteWatcher) + + // Clean up InputMirrors for refs that no longer exist + if err := c.cleanupOrphanedMirrors(ctx, symphony); err != nil { + logger.Error(err, "failed to cleanup orphaned mirrors") + } + + // With watches, we only need periodic reconciles as a fallback safety net + return ctrl.Result{RequeueAfter: FallbackSyncInterval}, nil +} + +// ensureInformers sets up informers for all resource refs in the symphony +func (w *remoteWatcher) ensureInformers(ctx context.Context, symphony *apiv1.Symphony) error { + logger := logr.FromContextOrDiscard(ctx) + w.mu.Lock() + defer w.mu.Unlock() + + // Track which GVRs we need + neededGVRs := make(map[schema.GroupVersionResource][]apiv1.RemoteResourceRef) + for _, ref := range symphony.Spec.RemoteResourceRefs { + gvr := schema.GroupVersionResource{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Resource: pluralize(ref.Resource.Kind), + } + neededGVRs[gvr] = append(neededGVRs[gvr], ref) + } + + // Set up informers for new GVRs + for gvr, refs := range neededGVRs { + if _, exists := w.watchedGVRs[gvr]; exists { + continue + } + + logger.V(1).Info("setting up informer for GVR", "gvr", gvr.String()) + + informer := w.informerFactory.ForResource(gvr).Informer() + + // Add event handler that triggers reconcile on changes + _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, newObj) + }, + DeleteFunc: func(obj interface{}) { + w.enqueueReconcile(ctx, symphony, refs, obj) + }, + }) + if err != nil { + return fmt.Errorf("adding event handler for %s: %w", gvr.String(), err) + } + + w.watchedGVRs[gvr] = struct{}{} + } + + // Start the informer factory (idempotent if already started) + w.informerFactory.Start(w.stopCh) + + // Wait for caches to sync + synced := w.informerFactory.WaitForCacheSync(w.stopCh) + for gvr, ok := range synced { + if !ok { + logger.Error(nil, "failed to sync informer cache", "gvr", gvr.String()) + } + } + + return nil +} + +// enqueueReconcile checks if the object matches any refs and enqueues a reconcile if so +func (w *remoteWatcher) enqueueReconcile(ctx context.Context, symphony *apiv1.Symphony, refs []apiv1.RemoteResourceRef, obj interface{}) { + logger := logr.FromContextOrDiscard(ctx) + + u, ok := obj.(*unstructured.Unstructured) + if !ok { + // Handle DeletedFinalStateUnknown + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + u, ok = tombstone.Obj.(*unstructured.Unstructured) + if !ok { + return + } + } else { + return + } + } + + // Check if this object matches any of our refs + for _, ref := range refs { + if matchesRef(u, ref) { + logger.V(1).Info("remote resource changed, enqueueing reconcile", + "resource", u.GetName(), + "namespace", u.GetNamespace(), + "key", ref.Key, + ) + // Send event through the channel to trigger reconcile + // Use non-blocking send to avoid blocking informer callbacks + select { + case w.controller.eventChan <- event.TypedGenericEvent[*apiv1.Symphony]{ + Object: symphony, + }: + default: + logger.V(1).Info("event channel full, reconcile will happen on next poll") + } + return + } + } +} + +// matchesRef checks if an unstructured object matches a remote resource ref +func matchesRef(obj *unstructured.Unstructured, ref apiv1.RemoteResourceRef) bool { + return obj.GetName() == ref.Resource.Name && + obj.GetNamespace() == ref.Resource.Namespace +} + +// pluralize converts a Kind to its plural resource name (simple heuristic) +func pluralize(kind string) string { + // Use strings.ToLower to properly lowercase the entire string (first letter only is insufficient for multi-word kinds like configMap) + lower := strings.ToLower(kind) + if lower[len(lower)-1] == 's' { + return lower + "es" + } + if lower[len(lower)-1] == 'y' { + return lower[:len(lower)-1] + "ies" + } + return lower + "s" +} + +// getResource fetches a resource from the remote cluster using the dynamic client +func (w *remoteWatcher) getResource(ctx context.Context, ref apiv1.RemoteResourceRef) (*unstructured.Unstructured, error) { + gvr := schema.GroupVersionResource{ + Group: ref.Resource.Group, + Version: ref.Resource.Version, + Resource: pluralize(ref.Resource.Kind), + } + + var obj *unstructured.Unstructured + var err error + + if ref.Resource.Namespace != "" { + obj, err = w.dynamicClient.Resource(gvr).Namespace(ref.Resource.Namespace).Get(ctx, ref.Resource.Name, metav1.GetOptions{}) + } else { + obj, err = w.dynamicClient.Resource(gvr).Get(ctx, ref.Resource.Name, metav1.GetOptions{}) + } + + return obj, err +} + +// syncResult holds the result of syncing a single remote resource +type syncResult struct { + key string + err error +} + +// syncRemoteResourcesParallel syncs all remote resource refs in parallel with bounded concurrency. +// This reduces reconcile latency from O(n * latency) to O(n/concurrency * latency). +// For example, with 100 refs at ~50ms each: sequential = 5s, parallel (10) = ~500ms. +func (c *Controller) syncRemoteResourcesParallel( + ctx context.Context, + symphony *apiv1.Symphony, + watcher *remoteWatcher, +) { + logger := logr.FromContextOrDiscard(ctx) + refs := symphony.Spec.RemoteResourceRefs + + if len(refs) == 0 { + return + } + + // Use a semaphore to limit concurrent overlay API calls + sem := make(chan struct{}, maxSyncConcurrency) + results := make(chan syncResult, len(refs)) + + // Use errgroup for structured concurrency, but we don't fail fast on errors + // since we want to sync as many refs as possible + g, ctx := errgroup.WithContext(ctx) + + for _, ref := range refs { + ref := ref // capture loop variable + g.Go(func() error { + // Acquire semaphore + select { + case sem <- struct{}{}: + defer func() { <-sem }() + case <-ctx.Done(): + results <- syncResult{key: ref.Key, err: ctx.Err()} + return nil + } + + err := c.syncRemoteResource(ctx, symphony, watcher, ref) + results <- syncResult{key: ref.Key, err: err} + return nil // Don't propagate errors - we handle them individually + }) + } + + // Wait for all goroutines to complete + _ = g.Wait() + close(results) + + // Process results + var successCount, failCount int + + for result := range results { + if result.err != nil { + logger.Error(result.err, "failed to sync remote resource", "key", result.key) + failCount++ + continue + } + successCount++ + } + + logger.V(1).Info("completed parallel remote sync", + "total", len(refs), + "success", successCount, + "failed", failCount, + ) +} + +// syncRemoteResource syncs a single remote resource to an InputMirror +func (c *Controller) syncRemoteResource( + ctx context.Context, + symphony *apiv1.Symphony, + watcher *remoteWatcher, + ref apiv1.RemoteResourceRef, +) error { + logger := logr.FromContextOrDiscard(ctx).WithValues("key", ref.Key, "resourceName", ref.Resource.Name) + + // SECURITY: Validate the resource kind is allowed to be synced + gk := schema.GroupKind{Group: ref.Resource.Group, Kind: ref.Resource.Kind} + if !c.allowedKinds[gk] { + return fmt.Errorf("security: resource kind %q is not allowed to be synced from remote", gk.String()) + } + + // Fetch from remote using the watcher's dynamic client + obj, err := watcher.getResource(ctx, ref) + if err != nil { + if errors.IsNotFound(err) && ref.Optional { + logger.V(1).Info("optional remote resource not found, skipping") + // Update InputMirror to reflect missing state + return c.updateMirrorMissing(ctx, symphony, ref) + } + return fmt.Errorf("getting remote resource: %w", err) + } + + // Create/Update InputMirror + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: mirrorName, + Namespace: symphony.Namespace, + }, + } + + result, err := controllerutil.CreateOrUpdate(ctx, c.client, mirror, func() error { + // Set owner reference + if err := controllerutil.SetControllerReference(symphony, mirror, c.scheme); err != nil { + return err + } + + // Update spec + mirror.Spec.Key = ref.Key + mirror.Spec.SymphonyRef = corev1.LocalObjectReference{Name: symphony.Name} + mirror.Spec.SourceResource = ref.Resource + + return nil + }) + + if err != nil { + return fmt.Errorf("creating/updating InputMirror: %w", err) + } + + // Update status separately - CreateOrUpdate only updates spec, not status subresource + rawData, err := json.Marshal(obj.Object) + if err != nil { + return fmt.Errorf("marshaling resource data: %w", err) + } + mirror.Status.Data = &runtime.RawExtension{Raw: rawData} + mirror.Status.LastSyncTime = &metav1.Time{Time: time.Now()} + mirror.Status.SyncGeneration = obj.GetResourceVersion() + + // Update conditions + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced from remote cluster") + + if err := c.client.Status().Update(ctx, mirror); err != nil { + return fmt.Errorf("updating InputMirror status: %w", err) + } + + logger.V(1).Info("synced remote resource", "result", result, "mirrorName", mirrorName) + return nil +} + +// updateMirrorMissing updates the InputMirror to reflect that the source resource is missing +func (c *Controller) updateMirrorMissing( + ctx context.Context, + symphony *apiv1.Symphony, + ref apiv1.RemoteResourceRef, +) error { + mirrorName := inputMirrorName(symphony.Name, ref.Key) + mirror := &apiv1.InputMirror{} + err := c.client.Get(ctx, types.NamespacedName{Name: mirrorName, Namespace: symphony.Namespace}, mirror) + if errors.IsNotFound(err) { + // No mirror exists, nothing to update + return nil + } + if err != nil { + return err + } + + // Update condition to reflect missing state + setSyncedCondition(mirror, false, "SourceNotFound", "Optional source resource not found in remote cluster") + mirror.Status.Data = nil + + return c.client.Status().Update(ctx, mirror) +} + +// cleanupOrphanedMirrors removes InputMirrors for refs that no longer exist in the Symphony +func (c *Controller) cleanupOrphanedMirrors(ctx context.Context, symphony *apiv1.Symphony) error { + logger := logr.FromContextOrDiscard(ctx) + + // List all InputMirrors owned by this Symphony + mirrors := &apiv1.InputMirrorList{} + if err := c.client.List(ctx, mirrors, + client.InNamespace(symphony.Namespace), + client.MatchingFields{"spec.symphonyRef.name": symphony.Name}, + ); err != nil { + // If the index isn't set up, fall back to filtering manually + if err := c.client.List(ctx, mirrors, client.InNamespace(symphony.Namespace)); err != nil { + return err + } + } + + // Build set of expected mirror names + expected := make(map[string]struct{}) + for _, ref := range symphony.Spec.RemoteResourceRefs { + expected[inputMirrorName(symphony.Name, ref.Key)] = struct{}{} + } + + // Delete orphaned mirrors + for _, mirror := range mirrors.Items { + // Check if owned by this symphony + if mirror.Spec.SymphonyRef.Name != symphony.Name { + continue + } + if _, ok := expected[mirror.Name]; !ok { + logger.V(1).Info("deleting orphaned InputMirror", "mirrorName", mirror.Name) + if err := c.client.Delete(ctx, &mirror); err != nil && !errors.IsNotFound(err) { + return err + } + } + } + + return nil +} + +// inputMirrorName generates the name for an InputMirror +func inputMirrorName(symphonyName, key string) string { + return fmt.Sprintf("%s-%s", symphonyName, key) +} + +// setSyncedCondition updates the Synced condition on an InputMirror +func setSyncedCondition(mirror *apiv1.InputMirror, synced bool, reason, message string) { + status := metav1.ConditionFalse + if synced { + status = metav1.ConditionTrue + } + + now := metav1.Now() + condition := metav1.Condition{ + Type: ConditionTypeSynced, + Status: status, + ObservedGeneration: mirror.Generation, + LastTransitionTime: now, + Reason: reason, + Message: message, + } + + // Find and update existing condition or append + for i, c := range mirror.Status.Conditions { + if c.Type == ConditionTypeSynced { + if c.Status != condition.Status { + mirror.Status.Conditions[i] = condition + } else { + // Only update reason/message, keep transition time + mirror.Status.Conditions[i].Reason = reason + mirror.Status.Conditions[i].Message = message + mirror.Status.Conditions[i].ObservedGeneration = mirror.Generation + } + return + } + } + mirror.Status.Conditions = append(mirror.Status.Conditions, condition) +} diff --git a/internal/controllers/remotesync/controller_test.go b/internal/controllers/remotesync/controller_test.go new file mode 100644 index 00000000..fcc8e128 --- /dev/null +++ b/internal/controllers/remotesync/controller_test.go @@ -0,0 +1,172 @@ +package remotesync + +import ( + "context" + "testing" + + apiv1 "github.com/Azure/eno/api/v1" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +func TestInputMirrorName(t *testing.T) { + tests := []struct { + symphonyName string + key string + expected string + }{ + {"symphony-123", "metricsSettings", "symphony-123-metricsSettings"}, + {"my-symphony", "config", "my-symphony-config"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + result := inputMirrorName(tt.symphonyName, tt.key) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestSetSyncedCondition(t *testing.T) { + mirror := &apiv1.InputMirror{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-mirror", + Namespace: "test-ns", + Generation: 1, + }, + } + + // Test setting synced=true + setSyncedCondition(mirror, true, "SyncSuccess", "Successfully synced") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, ConditionTypeSynced, mirror.Status.Conditions[0].Type) + assert.Equal(t, metav1.ConditionTrue, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncSuccess", mirror.Status.Conditions[0].Reason) + assert.Equal(t, "Successfully synced", mirror.Status.Conditions[0].Message) + + // Test updating to synced=false + setSyncedCondition(mirror, false, "SyncFailed", "Failed to sync") + + require.Len(t, mirror.Status.Conditions, 1) + assert.Equal(t, metav1.ConditionFalse, mirror.Status.Conditions[0].Status) + assert.Equal(t, "SyncFailed", mirror.Status.Conditions[0].Reason) +} + +func TestReconcile_NoRemoteRefs(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + // No RemoteResourceRefs + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, + // remoteWatcher nil - simulates no remote config provided + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_NoRemoteWatcher(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + symphony := &apiv1.Symphony{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-symphony", + Namespace: "test-ns", + }, + Spec: apiv1.SymphonySpec{ + RemoteResourceRefs: []apiv1.RemoteResourceRef{ + { + Key: "test", + Resource: apiv1.RemoteResourceSelector{ + Kind: "ConfigMap", + Version: "v1", + Name: "test-cm", + }, + }, + }, + }, + } + + client := fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(symphony). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, + // remoteWatcher nil - no remote config provided to eno-reconciler + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "test-symphony", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + // Should return empty result since no remote watcher available + assert.Equal(t, reconcile.Result{}, result) +} + +func TestReconcile_SymphonyNotFound(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + client := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + controller := &Controller{ + client: client, + scheme: scheme, + allowedKinds: AllowedSyncKinds, + } + + result, err := controller.Reconcile(context.Background(), reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: "non-existent", + Namespace: "test-ns", + }, + }) + + require.NoError(t, err) + assert.Equal(t, reconcile.Result{}, result) +}