From b4c8d5862e3212c7dad92984b9dd8499d15d402b Mon Sep 17 00:00:00 2001 From: David Eads Date: Mon, 10 Apr 2023 16:49:20 -0400 Subject: [PATCH] provide new interface with old method of reading featuregates --- .../featuregates/simple_featuregate_reader.go | 278 +++++++++++++++++ .../simple_featuregate_reader_test.go | 284 ++++++++++++++++++ 2 files changed, 562 insertions(+) create mode 100644 pkg/operator/configobserver/featuregates/simple_featuregate_reader.go create mode 100644 pkg/operator/configobserver/featuregates/simple_featuregate_reader_test.go diff --git a/pkg/operator/configobserver/featuregates/simple_featuregate_reader.go b/pkg/operator/configobserver/featuregates/simple_featuregate_reader.go new file mode 100644 index 0000000000..29770b55d9 --- /dev/null +++ b/pkg/operator/configobserver/featuregates/simple_featuregate_reader.go @@ -0,0 +1,278 @@ +package featuregates + +import ( + "context" + "fmt" + "os" + "reflect" + "sync" + "time" + + v1 "github.com/openshift/client-go/config/informers/externalversions/config/v1" + configlistersv1 "github.com/openshift/client-go/config/listers/config/v1" + "github.com/openshift/library-go/pkg/operator/events" + apierrors "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +type FeatureGateChangeHandlerFunc func(featureChange FeatureChange) + +type FeatureGateAccess interface { + SetChangeHandler(featureGateChangeHandlerFn FeatureGateChangeHandlerFunc) + + Run(ctx context.Context) + InitialFeatureGatesObserved() chan struct{} + CurrentFeatureGates() (enabled []string, disabled []string, err error) + AreInitialFeatureGatesObserved() bool +} + +type Features struct { + Enabled []string + Disabled []string +} + +type FeatureChange struct { + Previous *Features + New Features +} + +type defaultFeatureGateAccess struct { + desiredVersion string + missingVersionMarker string + clusterVersionLister configlistersv1.ClusterVersionLister + featureGateLister configlistersv1.FeatureGateLister + initialFeatureGatesObserved chan struct{} + + featureGateChangeHandlerFn FeatureGateChangeHandlerFunc + + lock sync.Mutex + started bool + initialFeatures Features + currentFeatures Features + + queue workqueue.RateLimitingInterface + eventRecorder events.Recorder +} + +// NewFeatureGateAccess returns a controller that keeps the list of enabled/disabled featuregates up to date. +// desiredVersion is the version of this operator that would be set on the clusteroperator.status.versions. +// missingVersionMarker is the stub version provided by the operator. If that is also the desired version, +// then the most either the desired clusterVersion or most recent version will be used. +// clusterVersionInformer is used when desiredVersion and missingVersionMarker are the same to derive the "best" version +// of featuregates to use. +// featureGateInformer is used to track changes to the featureGates once they are initially set. +// By default, when the enabled/disabled list of featuregates changes, os.Exit is called. This behavior can be +// overridden by calling SetChangeHandler to whatever you wish the behavior to be. +// A common construct is: +/* go +featureGateAccessor := NewFeatureGateAccess(args) +go featureGateAccessor.Run(ctx) + +select{ +case <- featureGateAccessor.InitialFeatureGatesObserved(): + enabled, disabled, _ := featureGateAccessor.CurrentFeatureGates() + klog.Infof("FeatureGates initialized: enabled=%v disabled=%v", enabled, disabled) +case <- time.After(1*time.Minute): + klog.Errorf("timed out waiting for FeatureGate detection") + return fmt.Errorf("timed out waiting for FeatureGate detection") +} + +// whatever other initialization you have to do, at this point you have FeatureGates to drive your behavior. +*/ +// That construct is easy. It is better to use the .spec.observedConfiguration construct common in library-go operators +// to avoid gating your general startup on FeatureGate determination, but if you haven't already got that mechanism +// this construct is easy. +func NewFeatureGateAccess( + desiredVersion, missingVersionMarker string, + clusterVersionInformer v1.ClusterVersionInformer, + featureGateInformer v1.FeatureGateInformer, + eventRecorder events.Recorder) FeatureGateAccess { + c := &defaultFeatureGateAccess{ + desiredVersion: desiredVersion, + missingVersionMarker: missingVersionMarker, + clusterVersionLister: clusterVersionInformer.Lister(), + featureGateLister: featureGateInformer.Lister(), + initialFeatureGatesObserved: make(chan struct{}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "feature-gate-detector"), + eventRecorder: eventRecorder, + } + c.SetChangeHandler(ForceExit) + + // we aren't expecting many + clusterVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.queue.Add("cluster") + }, + UpdateFunc: func(old, cur interface{}) { + c.queue.Add("cluster") + }, + DeleteFunc: func(uncast interface{}) { + c.queue.Add("cluster") + }, + }) + featureGateInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.queue.Add("cluster") + }, + UpdateFunc: func(old, cur interface{}) { + c.queue.Add("cluster") + }, + DeleteFunc: func(uncast interface{}) { + c.queue.Add("cluster") + }, + }) + + return c +} + +func ForceExit(featureChange FeatureChange) { + if featureChange.Previous != nil { + os.Exit(0) + } +} + +func (c *defaultFeatureGateAccess) SetChangeHandler(featureGateChangeHandlerFn FeatureGateChangeHandlerFunc) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.started { + panic("programmer error, cannot update the change handler after starting") + } + c.featureGateChangeHandlerFn = featureGateChangeHandlerFn +} + +func (c *defaultFeatureGateAccess) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Infof("Starting feature-gate-detector") + defer klog.Infof("Shutting down feature-gate-detector") + + go wait.UntilWithContext(ctx, c.runWorker, time.Second) + + <-ctx.Done() +} + +func (c *defaultFeatureGateAccess) syncHandler(ctx context.Context) error { + desiredVersion := c.desiredVersion + if c.missingVersionMarker == c.desiredVersion { + clusterVersion, err := c.clusterVersionLister.Get("version") + if apierrors.IsNotFound(err) { + return nil // we will be re-triggered when it is created + } + if err != nil { + return err + } + + desiredVersion = clusterVersion.Status.Desired.Version + if len(desiredVersion) == 0 && len(clusterVersion.Status.History) > 0 { + desiredVersion = clusterVersion.Status.History[0].Version + } + } + + featureGate, err := c.featureGateLister.Get("cluster") + if apierrors.IsNotFound(err) { + return nil // we will be re-triggered when it is created + } + if err != nil { + return err + } + + features := Features{} + enabled, disabled, err := FeaturesGatesFromFeatureSets(featureGate) + if err != nil { + return err + } + features.Enabled = enabled + features.Disabled = disabled + + c.setFeatureGates(features) + + return nil +} + +func (c *defaultFeatureGateAccess) setFeatureGates(features Features) { + c.lock.Lock() + defer c.lock.Unlock() + + var previousFeatures *Features + if c.AreInitialFeatureGatesObserved() { + t := c.currentFeatures + previousFeatures = &t + } + + c.currentFeatures = features + + if !c.AreInitialFeatureGatesObserved() { + c.initialFeatures = features + close(c.initialFeatureGatesObserved) + c.eventRecorder.Eventf("FeatureGatesInitialized", "FeatureGates updated to %#v", c.currentFeatures) + } + + if previousFeatures == nil || !reflect.DeepEqual(*previousFeatures, c.currentFeatures) { + if previousFeatures != nil { + c.eventRecorder.Eventf("FeatureGatesModified", "FeatureGates updated to %#v", c.currentFeatures) + } + + c.featureGateChangeHandlerFn(FeatureChange{ + Previous: previousFeatures, + New: c.currentFeatures, + }) + } +} + +func (c *defaultFeatureGateAccess) InitialFeatureGatesObserved() chan struct{} { + return c.initialFeatureGatesObserved +} + +func (c *defaultFeatureGateAccess) AreInitialFeatureGatesObserved() bool { + select { + case <-c.InitialFeatureGatesObserved(): + return true + default: + return false + } +} + +func (c *defaultFeatureGateAccess) CurrentFeatureGates() ([]string, []string, error) { + c.lock.Lock() + defer c.lock.Unlock() + + if !c.AreInitialFeatureGatesObserved() { + return nil, nil, fmt.Errorf("featureGates not yet observed") + } + retEnabled := make([]string, len(c.currentFeatures.Enabled)) + retDisabled := make([]string, len(c.currentFeatures.Disabled)) + copy(retEnabled, c.currentFeatures.Enabled) + copy(retDisabled, c.currentFeatures.Disabled) + + return retEnabled, retDisabled, nil +} + +func (c *defaultFeatureGateAccess) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { + } +} + +func (c *defaultFeatureGateAccess) processNextWorkItem(ctx context.Context) bool { + dsKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dsKey) + + err := c.syncHandler(ctx) + if err == nil { + c.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + c.queue.AddRateLimited(dsKey) + + return true +} diff --git a/pkg/operator/configobserver/featuregates/simple_featuregate_reader_test.go b/pkg/operator/configobserver/featuregates/simple_featuregate_reader_test.go new file mode 100644 index 0000000000..1aedc61689 --- /dev/null +++ b/pkg/operator/configobserver/featuregates/simple_featuregate_reader_test.go @@ -0,0 +1,284 @@ +package featuregates + +import ( + "context" + "reflect" + "sync" + "testing" + + configv1 "github.com/openshift/api/config/v1" + configlistersv1 "github.com/openshift/client-go/config/listers/config/v1" + "github.com/openshift/library-go/pkg/operator/events" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +type testFeatureGateBuilder struct { + featureSetName string + versionToFeatures map[string]Features +} + +func featureGateBuilder() *testFeatureGateBuilder { + return &testFeatureGateBuilder{ + versionToFeatures: map[string]Features{}, + } +} + +func (f *testFeatureGateBuilder) specFeatureSet(featureSetName string) *testFeatureGateBuilder { + f.featureSetName = featureSetName + + return f +} + +func (f *testFeatureGateBuilder) enabled(version string, enabled ...string) *testFeatureGateBuilder { + curr := f.versionToFeatures[version] + curr.Enabled = enabled + f.versionToFeatures[version] = curr + + return f +} + +func (f *testFeatureGateBuilder) disabled(version string, disabled ...string) *testFeatureGateBuilder { + curr := f.versionToFeatures[version] + curr.Disabled = disabled + f.versionToFeatures[version] = curr + + return f +} + +func (f *testFeatureGateBuilder) toFeatureGate() *configv1.FeatureGate { + ret := &configv1.FeatureGate{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster"}, + Spec: configv1.FeatureGateSpec{ + FeatureGateSelection: configv1.FeatureGateSelection{ + FeatureSet: configv1.FeatureSet(f.featureSetName), + }, + }, + } + + return ret +} + +type featureChangeTracker struct { + history []FeatureChange +} + +func (f *featureChangeTracker) change(featureChange FeatureChange) { + f.history = append(f.history, featureChange) +} + +func Test_defaultFeatureGateAccess_syncHandler(t *testing.T) { + closedCh := make(chan struct{}) + close(closedCh) + + type fields struct { + desiredVersion string + missingVersionMarker string + initialFeatureGatesObserved chan struct{} + initialFeatures Features + currentFeatures Features + } + tests := []struct { + name string + firstFeatureGate *configv1.FeatureGate + secondFeatureGate *configv1.FeatureGate + clusterVersion string + + fields fields + + changeVerifier func(t *testing.T, history []FeatureChange) + wantErr bool + }{ + { + name: "read-explicit-version", + + firstFeatureGate: featureGateBuilder(). + specFeatureSet("features-for-desired-version"). + enabled("desired-version", "alpha", "bravo"). + disabled("desired-version", "charlie", "delta"). + toFeatureGate(), + fields: fields{ + desiredVersion: "desired-version", + }, + + changeVerifier: func(t *testing.T, history []FeatureChange) { + if len(history) != 1 { + t.Fatalf("bad changes: %v", history) + } + if history[0].Previous != nil { + t.Fatalf("bad changes: %v", history) + } + if !reflect.DeepEqual([]string{"alpha", "bravo"}, history[0].New.Enabled) { + t.Fatal(history[0].New.Enabled) + } + if !reflect.DeepEqual([]string{"charlie", "delta"}, history[0].New.Disabled) { + t.Fatal(history[0].New.Enabled) + } + }, + }, + { + name: "read-explicit-version-from-others", + + firstFeatureGate: featureGateBuilder(). + specFeatureSet("features-for-desired-version"). + enabled("desired-version", "alpha", "bravo"). + disabled("desired-version", "charlie", "delta"). + enabled("other-version", "yankee", "zulu"). + toFeatureGate(), + fields: fields{ + desiredVersion: "desired-version", + }, + + changeVerifier: func(t *testing.T, history []FeatureChange) { + if len(history) != 1 { + t.Fatalf("bad changes: %v", history) + } + if history[0].Previous != nil { + t.Fatalf("bad changes: %v", history) + } + if !reflect.DeepEqual([]string{"alpha", "bravo"}, history[0].New.Enabled) { + t.Fatal(history[0].New.Enabled) + } + if !reflect.DeepEqual([]string{"charlie", "delta"}, history[0].New.Disabled) { + t.Fatal(history[0].New.Enabled) + } + }, + }, + { + name: "no-change-means-no-extra-watch-call", + + firstFeatureGate: featureGateBuilder(). + specFeatureSet("features-for-desired-version"). + enabled("desired-version", "alpha", "bravo"). + disabled("desired-version", "charlie", "delta"). + toFeatureGate(), + secondFeatureGate: featureGateBuilder(). + specFeatureSet("features-for-desired-version"). + enabled("desired-version", "alpha", "bravo"). + disabled("desired-version", "charlie", "delta"). + toFeatureGate(), + fields: fields{ + desiredVersion: "desired-version", + }, + + changeVerifier: func(t *testing.T, history []FeatureChange) { + if len(history) != 1 { + t.Fatalf("bad changes: %v", history) + } + if history[0].Previous != nil { + t.Fatalf("bad changes: %v", history) + } + if !reflect.DeepEqual([]string{"alpha", "bravo"}, history[0].New.Enabled) { + t.Fatal(history[0].New.Enabled) + } + if !reflect.DeepEqual([]string{"charlie", "delta"}, history[0].New.Disabled) { + t.Fatal(history[0].New.Enabled) + } + }, + }, + { + name: "missing desiredVersion", + + firstFeatureGate: featureGateBuilder(). + specFeatureSet("features-for-missing-version"). + enabled("other-version", "alpha", "bravo"). + disabled("other-version", "charlie", "delta"). + toFeatureGate(), + fields: fields{ + desiredVersion: "missing-version", + }, + + wantErr: true, + }, + } + + // set the map. This is very ugly, but will hopefully be replaced by https://github.com/openshift/library-go/pull/1468/files shortly + configv1.FeatureSets["features-for-desired-version"] = &configv1.FeatureGateEnabledDisabled{ + Enabled: []string{"alpha", "bravo"}, + Disabled: []string{"charlie", "delta"}, + } + configv1.FeatureSets["features-for-other-version"] = &configv1.FeatureGateEnabledDisabled{ + Enabled: []string{"alpha", "bravo"}, + Disabled: []string{"charlie", "delta"}, + } + defer func() { + delete(configv1.FeatureSets, "features-for-desired-version") + }() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + ctx := context.Background() + _, cancel := context.WithCancel(ctx) + defer cancel() + + featureGateIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + featureGateLister := configlistersv1.NewFeatureGateLister(featureGateIndexer) + if tt.firstFeatureGate != nil { + featureGateIndexer.Add(tt.firstFeatureGate) + } + + var clusterVersionLister configlistersv1.ClusterVersionLister + if len(tt.clusterVersion) > 0 { + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + indexer.Add(&configv1.ClusterVersion{ + ObjectMeta: metav1.ObjectMeta{Name: "version"}, + Status: configv1.ClusterVersionStatus{ + History: []configv1.UpdateHistory{ + {Version: tt.clusterVersion}, + }, + }, + }) + clusterVersionLister = configlistersv1.NewClusterVersionLister(indexer) + } + + changeTracker := &featureChangeTracker{} + + initialFeatureGatesObserved := tt.fields.initialFeatureGatesObserved + if tt.fields.initialFeatureGatesObserved == nil { + initialFeatureGatesObserved = make(chan struct{}) + } + c := &defaultFeatureGateAccess{ + desiredVersion: tt.fields.desiredVersion, + missingVersionMarker: tt.fields.missingVersionMarker, + clusterVersionLister: clusterVersionLister, + featureGateLister: featureGateLister, + featureGateChangeHandlerFn: changeTracker.change, + initialFeatureGatesObserved: initialFeatureGatesObserved, + lock: sync.Mutex{}, + started: true, + initialFeatures: tt.fields.initialFeatures, + currentFeatures: tt.fields.currentFeatures, + eventRecorder: events.NewInMemoryRecorder("fakee"), + } + + if c.AreInitialFeatureGatesObserved() { + t.Fatal("have seen initial)") + } + + err := c.syncHandler(ctx) + switch { + case err != nil && tt.wantErr: + return + case err == nil && !tt.wantErr: + default: + t.Errorf("syncHandler() error = %v, wantErr %v", err, tt.wantErr) + } + + if !c.AreInitialFeatureGatesObserved() { + t.Fatal("haven't seen initial)") + } + + if tt.secondFeatureGate != nil { + if err := featureGateIndexer.Update(tt.secondFeatureGate); err != nil { + t.Fatal(err) + } + } + if err := c.syncHandler(ctx); (err != nil) != tt.wantErr { + t.Errorf("syncHandler() error = %v, wantErr %v", err, tt.wantErr) + } + + tt.changeVerifier(t, changeTracker.history) + }) + } +}