From a472af3e0505e4a7854fadc562b6e3a5c93d3c36 Mon Sep 17 00:00:00 2001 From: Ondrej Pokorny Date: Thu, 20 Nov 2025 15:36:58 +0100 Subject: [PATCH 1/2] feat: add on-demand gathering limits This commit adds an informer that watches updates to the DataGather CR and introduces functionality to limit the number of running on-demand gathering jobs. Signed-off-by: Ondrej Pokorny --- .../periodic/datagather_informer.go | 81 ++++++++++++++++++- pkg/controller/periodic/periodic.go | 74 +++++++++++++++++ 2 files changed, 152 insertions(+), 3 deletions(-) diff --git a/pkg/controller/periodic/datagather_informer.go b/pkg/controller/periodic/datagather_informer.go index 3f9c8201d..af788d494 100644 --- a/pkg/controller/periodic/datagather_informer.go +++ b/pkg/controller/periodic/datagather_informer.go @@ -2,9 +2,13 @@ package periodic import ( "context" + "slices" "strings" + insightsv1 "github.com/openshift/api/insights/v1" insightsInformers "github.com/openshift/client-go/insights/informers/externalversions" + insightsListers "github.com/openshift/client-go/insights/listers/insights/v1" + "github.com/openshift/insights-operator/pkg/controller/status" "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" "k8s.io/apimachinery/pkg/api/meta" @@ -17,24 +21,39 @@ const ( ) // DataGatherInformer is an interface providing information -// about newly create DataGather resources +// about newly created DataGather resources type DataGatherInformer interface { + // Controller provides the base controller functionality from library-go factory.Controller + // DataGatherCreated returns a receive-only channel that sends the name of newly created + // DataGather resources based on which the on-demand gathering is triggered DataGatherCreated() <-chan string + // Lister returns a DataGatherLister that provides cached access to all DataGather resources + // without making API requests to the Kubernetes API server + Lister() insightsListers.DataGatherLister + // DataGatherStatusChanged returns a receive-only channel that signals when a DataGather + // resource's status changes to a finished state (GatheringFailed or GatheringSucceeded). + // This is used to check if data gathering has completed and trigger reconciliation of pending gatherings. + DataGatherStatusChanged() <-chan struct{} } // dataGatherController is type implementing DataGatherInformer type dataGatherController struct { factory.Controller - ch chan string + ch chan string + lister insightsListers.DataGatherLister + statusChanged chan struct{} } // NewDataGatherInformer creates a new instance of the DataGatherInformer interface func NewDataGatherInformer(eventRecorder events.Recorder, insightsInf insightsInformers.SharedInformerFactory) (DataGatherInformer, error) { inf := insightsInf.Insights().V1().DataGathers().Informer() + lister := insightsInf.Insights().V1().DataGathers().Lister() dgCtrl := &dataGatherController{ - ch: make(chan string), + ch: make(chan string), + statusChanged: make(chan struct{}, 10), // buffered + lister: lister, } _, err := inf.AddEventHandler(dgCtrl.eventHandler()) if err != nil { @@ -70,6 +89,50 @@ func (d *dataGatherController) eventHandler() cache.ResourceEventHandler { } d.ch <- dgMetadata.GetName() }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldDG, ok := oldObj.(*insightsv1.DataGather) + if !ok { + klog.Errorf("Expected DataGather, got %T", oldObj) + return + } + + newDG, ok := newObj.(*insightsv1.DataGather) + if !ok { + klog.Errorf("Expected DataGather, got %T", newObj) + return + } + + // filter out dataGathers created for periodic gathering + if strings.HasPrefix(newDG.GetName(), periodicGatheringPrefix) { + return + } + + newCondition := status.GetConditionByType(newDG, status.Progressing) + finishedReasons := []string{status.GatheringFailedReason, status.GatheringSucceededReason} + // Continue only if the new condition is one of the finished conditions + if newCondition == nil || !slices.Contains(finishedReasons, newCondition.Reason) { + return + } + + oldCondition := status.GetConditionByType(oldDG, status.Progressing) + if oldCondition == nil { + return + } + + // Send signal only if the old condition is not equal to the new condition, which means + // the state changed from running to some of the finished conditions. + if oldCondition.Status != newCondition.Status || + oldCondition.Reason != newCondition.Reason || + !oldCondition.LastTransitionTime.Equal(&newCondition.LastTransitionTime) { + klog.Infof("DataGather %s status changed, signaling reconciliation", newDG.Name) + + select { + case d.statusChanged <- struct{}{}: + default: + // Channel full, signal already pending + } + } + }, } } @@ -78,3 +141,15 @@ func (d *dataGatherController) eventHandler() cache.ResourceEventHandler { func (d *dataGatherController) DataGatherCreated() <-chan string { return d.ch } + +// Lister returns a DataGatherLister that can be used to query +// the informer's cache without making API requests +func (d *dataGatherController) Lister() insightsListers.DataGatherLister { + return d.lister +} + +// DataGatherStatusChanged returns a channel providing the name of +// updated DataGather resource +func (d *dataGatherController) DataGatherStatusChanged() <-chan struct{} { + return d.statusChanged +} diff --git a/pkg/controller/periodic/periodic.go b/pkg/controller/periodic/periodic.go index 14c4b7828..a0d42c0a9 100644 --- a/pkg/controller/periodic/periodic.go +++ b/pkg/controller/periodic/periodic.go @@ -10,6 +10,7 @@ import ( "time" "golang.org/x/exp/slices" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -310,6 +311,8 @@ func (c *Controller) periodicTrigger(stopCh <-chan struct{}) { } } +const gatheringLimit = 3 + // onDemandGather listens to newly created DataGather resources and checks // the state of each resource. If the state is not an empty string, it means that // the corresponding job is already running or has been started and new data gathering @@ -319,6 +322,29 @@ func (c *Controller) onDemandGather(stopCh <-chan struct{}) { select { case <-stopCh: return + case <-c.dgInf.DataGatherStatusChanged(): + go func() { + klog.Info("DataGatherStatusChaged triggered") + dataGatherList, err := c.dgInf.Lister().List(labels.Everything()) + if err != nil { + klog.Errorf("Failed listing datagathers: %v", err) + return + } + + dgNotStarted, runningJobsCounter := countActiveGatheringJobs(dataGatherList) + if runningJobsCounter >= gatheringLimit { + klog.Infof("GatheringLimit reached: %d/%d", runningJobsCounter, gatheringLimit) + return + } + + if dgNotStarted != nil { + ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) + defer cancel() + + klog.Infof("Starting on-demand data gathering for the %s DataGather resource", dgNotStarted.Name) + c.runJobAndCheckResults(ctx, dgNotStarted, c.image) + } + }() case dgName := <-c.dgInf.DataGatherCreated(): go func() { ctx, cancel := context.WithTimeout(context.Background(), c.configAggregator.Config().DataReporting.Interval*4) @@ -329,6 +355,19 @@ func (c *Controller) onDemandGather(stopCh <-chan struct{}) { return } + dgLister := c.dgInf.Lister() + dataGatherList, err := dgLister.List(labels.Everything()) + if err != nil { + klog.Error("Failed to list datagathers") + } + + _, runningJobsCounter := countActiveGatheringJobs(dataGatherList) + klog.Infof("Created Gathering: counter: %d", runningJobsCounter) + if runningJobsCounter >= gatheringLimit { + klog.Infof("GatheringLimit reached: %d/%d", runningJobsCounter, gatheringLimit) + return + } + klog.Infof("Starting on-demand data gathering for the %s DataGather resource", dgName) c.runJobAndCheckResults(ctx, dataGather, c.image) }() @@ -336,6 +375,41 @@ func (c *Controller) onDemandGather(stopCh <-chan struct{}) { } } +// countActiveGatheringJobs analyzes a list of DataGather resources to determine which on-demand gathering jobs are currently active +// and identifies pending DataGather resources that have not yet started. +// It excludes periodic gathering DataGathers (those with "periodic-gathering-" prefix) from the analysis. +func countActiveGatheringJobs(dataGatherList []*insightsv1.DataGather) (pendingDataGather *insightsv1.DataGather, count int) { + var dgNotStarted *insightsv1.DataGather + + runningCounter := 0 + for _, dataGather := range dataGatherList { + if strings.HasPrefix(dataGather.GetName(), periodicGatheringPrefix) { + continue + } + + progressingConditions := status.GetConditionByType(dataGather, status.Progressing) + // Gathering was not started + if progressingConditions == nil && dgNotStarted == nil { + dgNotStarted = dataGather + continue + } + + // No datagather selected for starting + if dgNotStarted == nil && + progressingConditions != nil && progressingConditions.Reason == status.DataGatheringPendingReason { + dgNotStarted = dataGather + } + + // Gathering is running + if progressingConditions != nil && + progressingConditions.Status == metav1.ConditionTrue && progressingConditions.Reason == status.GatheringReason { + runningCounter++ + } + } + + return dgNotStarted, runningCounter +} + // syncDataGatherCR listens for finished jobs and ensures the corresponding DataGather status is updated. // If a job finishes but the DataGather CR is not in a finished state, it sets it to GatheringFailedReason. // This catches cases where the job exits unexpectedly (e.g., OOM kill) before updating the status itself. From 91e69b98c5d22f09b5982f49e6748f436f686d24 Mon Sep 17 00:00:00 2001 From: Ondrej Pokorny Date: Thu, 27 Nov 2025 14:48:44 +0100 Subject: [PATCH 2/2] feat: add tests for on-demand job limiting Signed-off-by: Ondrej Pokorny --- .../periodic/datagather_informer_test.go | 308 ++++++++++++++++++ pkg/controller/periodic/periodic_test.go | 180 ++++++++++ 2 files changed, 488 insertions(+) create mode 100644 pkg/controller/periodic/datagather_informer_test.go diff --git a/pkg/controller/periodic/datagather_informer_test.go b/pkg/controller/periodic/datagather_informer_test.go new file mode 100644 index 000000000..e791afc16 --- /dev/null +++ b/pkg/controller/periodic/datagather_informer_test.go @@ -0,0 +1,308 @@ +package periodic + +import ( + "testing" + + insightsv1 "github.com/openshift/api/insights/v1" + "github.com/openshift/insights-operator/pkg/controller/status" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func Test_eventHandler_addFunc(t *testing.T) { + tests := []struct { + name string + dataGather insightsv1.DataGather + expectChannelMsg bool + expectedName string + }{ + { + name: "non-periodic DataGather triggers channel message", + dataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "on-demand-gather", + }, + }, + expectChannelMsg: true, + expectedName: "on-demand-gather", + }, + { + name: "periodic DataGather is filtered out", + dataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "periodic-gathering-12345", + }, + }, + expectChannelMsg: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dataGatherController := &dataGatherController{ + ch: make(chan string, 1), + statusChanged: make(chan struct{}), + } + handler := dataGatherController.eventHandler() + + // Act + handler.OnAdd(&tt.dataGather, false) + + // Assert + if tt.expectChannelMsg { + select { + case msg := <-dataGatherController.ch: + assert.Equal(t, tt.expectedName, msg, + "expected channel message %q, got %q", tt.expectedName, msg, + ) + default: + assert.Fail(t, "expected channel message but got none") + } + } else { + select { + case msg := <-dataGatherController.ch: + assert.Fail(t, "expected no channel message but got %q", msg) + default: + // Expected: no message + } + } + }) + } +} + +func Test_eventHandler_updateFunc(t *testing.T) { + tests := []struct { + name string + oldDataGather insightsv1.DataGather + newDataGather insightsv1.DataGather + expectStatusSignal bool + }{ + { + name: "status changed from running to succeeded triggers signal", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.GatheringSucceededReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: true, + }, + { + name: "status changed from running to failed triggers signal", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.GatheringFailedReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: true, + }, + { + name: "periodic DataGather update is filtered out", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "periodic-gathering-12345", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "periodic-gathering-12345", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.GatheringSucceededReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: false, + }, + { + name: "no status change does not trigger signal", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: false, + }, + { + name: "update to non-finished does not trigger signal", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.DataGatheringPendingReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: false, + }, + { + name: "old DataGather without condition does not trigger signal", + oldDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + newDataGather: insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-gather", + }, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.GatheringSucceededReason, + LastTransitionTime: metav1.Now(), + }, + }, + }, + }, + expectStatusSignal: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dataGatherController := &dataGatherController{ + ch: make(chan string), + statusChanged: make(chan struct{}, 10), + } + handler := dataGatherController.eventHandler() + + // Act + handler.OnUpdate(&tt.oldDataGather, &tt.newDataGather) + + // Assert + if tt.expectStatusSignal { + select { + case <-dataGatherController.statusChanged: + // Expected: signal received + default: + assert.Fail(t, "expected status change signal but got none") + } + } else { + select { + case <-dataGatherController.statusChanged: + assert.Fail(t, "expected no status change signal but got one") + default: + // Expected: no signal + } + } + }) + } +} diff --git a/pkg/controller/periodic/periodic_test.go b/pkg/controller/periodic/periodic_test.go index 3af1f4232..9125020e1 100644 --- a/pkg/controller/periodic/periodic_test.go +++ b/pkg/controller/periodic/periodic_test.go @@ -1913,3 +1913,183 @@ func TestGetDataGatherCR(t *testing.T) { }) } } + +func Test_CountActiveGatheringJobs(t *testing.T) { + tests := []struct { + name string + dataGatherList []*insightsv1.DataGather + expectedGatheringCount int + expectedPendingGatherer *insightsv1.DataGather + }{ + { + name: "multiple active gatherings, no pending gatherer", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "on-demand-1"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "on-demand-2"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + }, + }, + }, + }, + }, + expectedGatheringCount: 2, + expectedPendingGatherer: nil, + }, + { + name: "empty datagather list", + dataGatherList: []*insightsv1.DataGather{}, + expectedGatheringCount: 0, + expectedPendingGatherer: nil, + }, + { + name: "one pending gatherer without progressing condition", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "pending-gather"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + expectedGatheringCount: 0, + expectedPendingGatherer: &insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{Name: "pending-gather"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + { + name: "multiple pending gatherers - only one is returned", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "pending-1"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "pending-2"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + expectedGatheringCount: 0, + expectedPendingGatherer: &insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{Name: "pending-1"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + { + name: "periodic gathering is excluded from count", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "periodic-gathering-1"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + }, + }, + }, + }, + }, + expectedGatheringCount: 0, + expectedPendingGatherer: nil, + }, + { + name: "mixed active, pending, and periodic gatherings", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "periodic-gathering-abc"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "on-demand-active"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionTrue, + Reason: status.GatheringReason, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "on-demand-pending"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + expectedGatheringCount: 1, + expectedPendingGatherer: &insightsv1.DataGather{ + ObjectMeta: metav1.ObjectMeta{Name: "on-demand-pending"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{}, + }, + }, + }, + { + name: "completed gathering is not counted", + dataGatherList: []*insightsv1.DataGather{ + { + ObjectMeta: metav1.ObjectMeta{Name: "completed-gather"}, + Status: insightsv1.DataGatherStatus{ + Conditions: []metav1.Condition{ + { + Type: status.Progressing, + Status: metav1.ConditionFalse, + Reason: status.GatheringSucceededReason, + }, + }, + }, + }, + }, + expectedGatheringCount: 0, + expectedPendingGatherer: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actualPendingGatherers, count := countActiveGatheringJobs(tt.dataGatherList) + + assert.Equal(t, tt.expectedGatheringCount, count, + "expected gathering count: %d should match gathering count: %d", tt.expectedGatheringCount, count, + ) + assert.Equal(t, tt.expectedPendingGatherer, actualPendingGatherers, "expectedPendingGatherer does not match the actual pending gatherer") + }) + } +}