From 2e652fc81f3aed0041f92d100ccdb95f7fbb5a04 Mon Sep 17 00:00:00 2001 From: Arik Alon Date: Wed, 8 Oct 2025 16:44:21 +0300 Subject: [PATCH 1/3] Enable advanced filters - allow running kubewatch without sending the following events: Events: filter out Normal events (send only warning). Filter out Delete events Jobs: Send only spec changes, or failures related updates Pod: Send only spec changes, or failures related updates --- docs/ADVANCED_FILTERING.md | 160 ++++++++++ pkg/filter/filter.go | 302 ++++++++++++++++++ pkg/filter/filter_test.go | 441 ++++++++++++++++++++++++++ pkg/handlers/cloudevent/cloudevent.go | 9 + 4 files changed, 912 insertions(+) create mode 100644 docs/ADVANCED_FILTERING.md create mode 100644 pkg/filter/filter.go create mode 100644 pkg/filter/filter_test.go diff --git a/docs/ADVANCED_FILTERING.md b/docs/ADVANCED_FILTERING.md new file mode 100644 index 00000000..78793520 --- /dev/null +++ b/docs/ADVANCED_FILTERING.md @@ -0,0 +1,160 @@ +# Advanced Filtering for Kubewatch + +## Overview + +The advanced filtering feature allows Kubewatch to filter out irrelevant Kubernetes events before sending them to Robusta, significantly reducing the performance impact on the Robusta system. This feature is particularly useful when monitoring large Kubernetes clusters where many events are generated but only a subset are actually relevant. + +## Configuration + +The filtering mechanism is controlled via the environment variable `ADVANCED_FILTERS`: + +```bash +export ADVANCED_FILTERS=true # Enable advanced filtering +export ADVANCED_FILTERS=false # Disable advanced filtering (default) +``` + +When not set, the feature defaults to `false` (disabled), maintaining backward compatibility. + +## Filtering Rules + +When advanced filtering is enabled, the following rules are applied: + +### Event Resources (api/v1/Event and events.k8s.io/v1/Event) + +- **Sent**: Only Warning events that are Created +- **Filtered**: + - Normal events (regardless of operation) + - Warning events with Update or Delete operations + +### Job Resources + +- **Always Sent**: + - Create events + - Delete events + +- **Conditionally Sent** (Update events): + - When the Job spec changes + - When the Job fails (status condition contains "Failed") + +- **Filtered**: Update events without spec changes or failures + +### Pod Resources + +- **Always Sent**: + - Create events + - Delete events + +- **Conditionally Sent** (Update events): + - When the Pod spec changes + - When any container (including init containers) has restartCount > 0 + - When any container is waiting with reason "ImagePullBackOff" + - When the Pod is evicted + - When any container is terminated with reason "OOMKilled" + +- **Filtered**: Update events without any of the above conditions + +### All Other Resources + +All events for resources not explicitly mentioned above (e.g., Deployments, Services, ConfigMaps, etc.) are sent without filtering. + +## Implementation Details + +The filtering logic is implemented in the `pkg/filter` package and integrated into the CloudEvent handler. The filter evaluates each event before it's sent to Robusta, checking the resource type and event characteristics against the defined rules. + +### Key Components: + +1. **Filter Package** (`pkg/filter/filter.go`): Contains the core filtering logic +2. **CloudEvent Handler Integration**: The filter is initialized and used in the CloudEvent handler +3. **Environment Variable**: `ADVANCED_FILTERS` controls whether filtering is active + +## Usage Example + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kubewatch +spec: + template: + spec: + containers: + - name: kubewatch + image: kubewatch:latest + env: + - name: ADVANCED_FILTERS + value: "true" + - name: KW_CLOUDEVENT_URL + value: "https://your-robusta-endpoint" +``` + +### Docker Run + +```bash +docker run -d \ + -e ADVANCED_FILTERS=true \ + -e KW_CLOUDEVENT_URL=https://your-robusta-endpoint \ + kubewatch:latest +``` + +## Testing + +To run the filter tests: + +```bash +cd pkg/filter +go test -v +``` + +## Performance Impact + +With advanced filtering enabled, you can expect: + +- **Reduced Network Traffic**: Fewer events sent to Robusta +- **Lower CPU/Memory Usage**: Robusta processes fewer irrelevant events +- **Improved Signal-to-Noise Ratio**: Only meaningful events are forwarded + +## Debugging + +When debugging is enabled (log level set to debug), filtered events are logged with details about why they were filtered: + +``` +DEBU[0001] Filtering out Event resource - type: Normal (only Warning events are sent) +DEBU[0002] Filtering out Pod update event - no significant changes detected +``` + +## Migration Guide + +1. **Test in Development**: Enable filtering in a development environment first +2. **Monitor Metrics**: Compare the number of events before and after enabling filtering +3. **Gradual Rollout**: Enable filtering on a subset of Kubewatch instances before full deployment +4. **Verify Coverage**: Ensure important events are still being captured + +## Troubleshooting + +### Events Not Being Received + +If expected events are not reaching Robusta after enabling filtering: + +1. Check the ADVANCED_FILTERS environment variable is set correctly +2. Review the filtering rules to ensure your use case is covered +3. Enable debug logging to see which events are being filtered +4. Temporarily disable filtering to confirm it's the cause + +### All Events Still Being Sent + +If filtering appears to have no effect: + +1. Verify ADVANCED_FILTERS is set to "true" (string value) +2. Check Kubewatch logs for "Advanced filtering is ENABLED" message +3. Ensure you're using the CloudEvent handler (filtering only works with CloudEvent handler) + +## Future Enhancements + +Potential improvements to the filtering system: + +- Configurable filtering rules via configuration file +- Per-resource-type filtering toggles +- Custom filtering expressions +- Filtering statistics and metrics \ No newline at end of file diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 00000000..b382fcc8 --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,302 @@ +/* +Copyright 2024 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "os" + "reflect" + "strconv" + "strings" + + "github.com/bitnami-labs/kubewatch/pkg/event" + "github.com/sirupsen/logrus" + + batch_v1 "k8s.io/api/batch/v1" + api_v1 "k8s.io/api/core/v1" + events_v1 "k8s.io/api/events/v1" +) + +// Filter is the main filter struct +type Filter struct { + enabled bool +} + +// NewFilter creates a new filter instance +func NewFilter() *Filter { + enabled := false + if envVal := os.Getenv("ADVANCED_FILTERS"); envVal != "" { + parsedVal, err := strconv.ParseBool(envVal) + if err == nil { + enabled = parsedVal + } else { + logrus.Warnf("Invalid ADVANCED_FILTERS value: %s, defaulting to false", envVal) + } + } + + if enabled { + logrus.Info("Advanced filtering is ENABLED") + } else { + logrus.Info("Advanced filtering is DISABLED") + } + + return &Filter{ + enabled: enabled, + } +} + +// ShouldSendEvent determines if an event should be sent to Robusta +func (f *Filter) ShouldSendEvent(e event.Event) bool { + // If filtering is disabled, send all events + if !f.enabled { + return true + } + + // Apply filtering rules based on resource kind + switch e.Kind { + case "Event": + return f.shouldSendEventResource(e) + case "Job": + return f.shouldSendJobEvent(e) + case "Pod": + return f.shouldSendPodEvent(e) + default: + // For all other resources, send the event + return true + } +} + +// shouldSendEventResource filters Kubernetes Event resources +func (f *Filter) shouldSendEventResource(e event.Event) bool { + // For Event resources, only send warning events and only create events + if e.Reason != "Created" { + logrus.Infof("Filtering out Event resource - reason: %s (only 'Created' events are sent)", e.Reason) + return false + } + + // Check if it's a warning event + switch obj := e.Obj.(type) { + case *api_v1.Event: + if obj.Type != api_v1.EventTypeWarning { + logrus.Infof("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + return false + } + case *events_v1.Event: + if obj.Type != api_v1.EventTypeWarning { + logrus.Infof("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + return false + } + default: + // If we can't determine the type, send it to be safe + logrus.Warnf("Unable to determine Event type for filtering, sending event") + return true + } + + return true +} + +// shouldSendJobEvent filters Job events +func (f *Filter) shouldSendJobEvent(e event.Event) bool { + // Always send Create and Delete events + if e.Reason == "Created" || e.Reason == "Deleted" { + return true + } + + // For Update events, check if spec changed or job failed + if e.Reason == "Updated" { + job, ok := e.Obj.(*batch_v1.Job) + if !ok { + logrus.Warnf("Unable to cast Job object for filtering, sending event") + return true + } + + oldJob, ok := e.OldObj.(*batch_v1.Job) + if !ok { + // If we don't have the old object, send the event to be safe + return true + } + + // Check if spec changed + if !reflect.DeepEqual(job.Spec, oldJob.Spec) { + logrus.Infof("Job %s spec changed, sending update event", job.Name) + return true + } + + // Check if job failed + for _, condition := range job.Status.Conditions { + if condition.Type == batch_v1.JobFailed && condition.Status == api_v1.ConditionTrue { + logrus.Infof("Job %s failed, sending update event", job.Name) + return true + } + } + + logrus.Infof("Filtering out Job update event - no spec change or failure detected") + return false + } + + // For other event types, don't send + return false +} + +// shouldSendPodEvent filters Pod events +func (f *Filter) shouldSendPodEvent(e event.Event) bool { + // Always send Create and Delete events + if e.Reason == "Created" || e.Reason == "Deleted" { + return true + } + + // For Update events, apply specific filters + if e.Reason == "Updated" { + pod, ok := e.Obj.(*api_v1.Pod) + if !ok { + logrus.Warnf("Unable to cast Pod object for filtering, sending event") + return true + } + + oldPod, ok := e.OldObj.(*api_v1.Pod) + if !ok { + // If we don't have the old object, send the event to be safe + return true + } + + // Check if spec changed + if !reflect.DeepEqual(pod.Spec, oldPod.Spec) { + logrus.Infof("Pod %s spec changed, sending update event", pod.Name) + return true + } + + // Check for container restarts + if f.hasContainerRestarted(pod) { + logrus.Infof("Pod %s has container restarts, sending update event", pod.Name) + return true + } + + // Check for ImagePullBackOff + if f.hasImagePullBackOff(pod) { + logrus.Infof("Pod %s has ImagePullBackOff, sending update event", pod.Name) + return true + } + + // Check if pod is evicted + if f.isPodEvicted(pod) { + logrus.Infof("Pod %s is evicted, sending update event", pod.Name) + return true + } + + // Check for OOMKilled + if f.hasOOMKilled(pod) { + logrus.Infof("Pod %s has OOMKilled container, sending update event", pod.Name) + return true + } + + logrus.Infof("Filtering out Pod update event - no significant changes detected") + return false + } + + // For other event types, don't send + return false +} + +// hasContainerRestarted checks if any container (including init containers) has restarted +func (f *Filter) hasContainerRestarted(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if containerStatus.RestartCount > 0 { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.RestartCount > 0 { + return true + } + } + + return false +} + +// hasImagePullBackOff checks if any container is waiting due to ImagePullBackOff +func (f *Filter) hasImagePullBackOff(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if containerStatus.State.Waiting != nil && + containerStatus.State.Waiting.Reason == "ImagePullBackOff" { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Waiting != nil && + containerStatus.State.Waiting.Reason == "ImagePullBackOff" { + return true + } + } + + return false +} + +// isPodEvicted checks if the pod has been evicted +func (f *Filter) isPodEvicted(pod *api_v1.Pod) bool { + // Check pod phase and reason + if pod.Status.Phase == api_v1.PodFailed && pod.Status.Reason == "Evicted" { + return true + } + + // Alternative check: look for eviction in pod status message + if strings.Contains(pod.Status.Message, "evicted") { + return true + } + + return false +} + +// hasOOMKilled checks if any container was OOMKilled +func (f *Filter) hasOOMKilled(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if f.isContainerOOMKilled(containerStatus) { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if f.isContainerOOMKilled(containerStatus) { + return true + } + } + + return false +} + +// isContainerOOMKilled checks if a specific container status indicates OOMKilled +func (f *Filter) isContainerOOMKilled(status api_v1.ContainerStatus) bool { + // Check current state + if status.State.Terminated != nil && + status.State.Terminated.Reason == "OOMKilled" { + return true + } + + // Check last state + if status.LastTerminationState.Terminated != nil && + status.LastTerminationState.Terminated.Reason == "OOMKilled" { + return true + } + + return false +} diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go new file mode 100644 index 00000000..c503543e --- /dev/null +++ b/pkg/filter/filter_test.go @@ -0,0 +1,441 @@ +/* +Copyright 2024 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "os" + "testing" + + "github.com/bitnami-labs/kubewatch/pkg/event" + batch_v1 "k8s.io/api/batch/v1" + api_v1 "k8s.io/api/core/v1" + events_v1 "k8s.io/api/events/v1" +) + +func TestNewFilter(t *testing.T) { + tests := []struct { + name string + envValue string + expected bool + }{ + {"Disabled by default", "", false}, + {"Enabled with true", "true", true}, + {"Disabled with false", "false", false}, + {"Invalid value defaults to false", "invalid", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + os.Setenv("ADVANCED_FILTERS", tt.envValue) + defer os.Unsetenv("ADVANCED_FILTERS") + } + + filter := NewFilter() + if filter.enabled != tt.expected { + t.Errorf("Expected enabled=%v, got %v", tt.expected, filter.enabled) + } + }) + } +} + +func TestShouldSendEventResource(t *testing.T) { + filter := &Filter{enabled: true} + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Warning Event Created - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: true, + }, + { + name: "Normal Event Created - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeNormal, + }, + }, + expected: false, + }, + { + name: "Warning Event Updated - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Updated", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: false, + }, + { + name: "Warning Event Deleted - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Deleted", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: false, + }, + { + name: "Warning EventsV1 Created - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &events_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendJobEvent(t *testing.T) { + filter := &Filter{enabled: true} + + jobSpec1 := batch_v1.JobSpec{ + Parallelism: intPtr(1), + } + jobSpec2 := batch_v1.JobSpec{ + Parallelism: intPtr(2), + } + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Job Created - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Created", + Obj: &batch_v1.Job{}, + }, + expected: true, + }, + { + name: "Job Deleted - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Deleted", + Obj: &batch_v1.Job{}, + }, + expected: true, + }, + { + name: "Job Updated with Spec Change - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec2, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: true, + }, + { + name: "Job Updated with Failure - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec1, + Status: batch_v1.JobStatus{ + Conditions: []batch_v1.JobCondition{ + { + Type: batch_v1.JobFailed, + Status: api_v1.ConditionTrue, + }, + }, + }, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: true, + }, + { + name: "Job Updated without Changes - Should Filter", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec1, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendPodEvent(t *testing.T) { + filter := &Filter{enabled: true} + + podSpec1 := api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyAlways, + } + podSpec2 := api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyNever, + } + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Pod Created - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Created", + Obj: &api_v1.Pod{}, + }, + expected: true, + }, + { + name: "Pod Deleted - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Deleted", + Obj: &api_v1.Pod{}, + }, + expected: true, + }, + { + name: "Pod Updated with Spec Change - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec2, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated with Container Restart - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + RestartCount: 1, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated with ImagePullBackOff - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + State: api_v1.ContainerState{ + Waiting: &api_v1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + }, + }, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Evicted - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + Phase: api_v1.PodFailed, + Reason: "Evicted", + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod with OOMKilled Container - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + State: api_v1.ContainerState{ + Terminated: &api_v1.ContainerStateTerminated{ + Reason: "OOMKilled", + }, + }, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated without Significant Changes - Should Filter", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + Phase: api_v1.PodRunning, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendEventWithFilterDisabled(t *testing.T) { + filter := &Filter{enabled: false} + + tests := []struct { + name string + event event.Event + }{ + { + name: "Any Event", + event: event.Event{ + Kind: "Event", + Reason: "Updated", + }, + }, + { + name: "Any Pod", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + }, + }, + { + name: "Any Job", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + }, + }, + { + name: "Any Other Resource", + event: event.Event{ + Kind: "Deployment", + Reason: "Updated", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if !result { + t.Errorf("Expected all events to be sent when filter is disabled") + } + }) + } +} + +// Helper function to create int pointers +func intPtr(i int32) *int32 { + return &i +} diff --git a/pkg/handlers/cloudevent/cloudevent.go b/pkg/handlers/cloudevent/cloudevent.go index f05dd06b..0383dd02 100644 --- a/pkg/handlers/cloudevent/cloudevent.go +++ b/pkg/handlers/cloudevent/cloudevent.go @@ -28,6 +28,7 @@ import ( "github.com/bitnami-labs/kubewatch/config" "github.com/bitnami-labs/kubewatch/pkg/event" + "github.com/bitnami-labs/kubewatch/pkg/filter" "k8s.io/apimachinery/pkg/runtime" ) @@ -49,6 +50,7 @@ type CloudEvent struct { Url string StartTime uint64 Counter uint64 + Filter *filter.Filter } type CloudEventMessage struct { @@ -77,6 +79,7 @@ func (m *CloudEvent) Init(c *config.Config) error { m.Url = c.Handler.CloudEvent.Url m.StartTime = uint64(time.Now().Unix()) m.Counter = 0 + m.Filter = filter.NewFilter() if m.Url == "" { m.Url = os.Getenv("KW_CLOUDEVENT_URL") @@ -90,6 +93,12 @@ func (m *CloudEvent) Init(c *config.Config) error { } func (m *CloudEvent) Handle(e event.Event) { + // Apply filtering if enabled + if !m.Filter.ShouldSendEvent(e) { + logrus.Infof("Event filtered out - Kind: %s, Reason: %s, Name: %s", e.Kind, e.Reason, e.Name) + return + } + m.Counter++ // TODO: do we have to worry about threadsafety here? message := m.prepareMessage(e) From be25f8b3d07166847887510f9b9b56605e812724 Mon Sep 17 00:00:00 2001 From: Arik Alon Date: Wed, 8 Oct 2025 20:05:52 +0300 Subject: [PATCH 2/3] reduce logs to debug --- pkg/filter/filter.go | 24 ++++++++++++------------ pkg/handlers/cloudevent/cloudevent.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index b382fcc8..484e01ba 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -83,7 +83,7 @@ func (f *Filter) ShouldSendEvent(e event.Event) bool { func (f *Filter) shouldSendEventResource(e event.Event) bool { // For Event resources, only send warning events and only create events if e.Reason != "Created" { - logrus.Infof("Filtering out Event resource - reason: %s (only 'Created' events are sent)", e.Reason) + logrus.Debugf("Filtering out Event resource - reason: %s (only 'Created' events are sent)", e.Reason) return false } @@ -91,12 +91,12 @@ func (f *Filter) shouldSendEventResource(e event.Event) bool { switch obj := e.Obj.(type) { case *api_v1.Event: if obj.Type != api_v1.EventTypeWarning { - logrus.Infof("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + logrus.Debugf("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) return false } case *events_v1.Event: if obj.Type != api_v1.EventTypeWarning { - logrus.Infof("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + logrus.Debugf("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) return false } default: @@ -131,19 +131,19 @@ func (f *Filter) shouldSendJobEvent(e event.Event) bool { // Check if spec changed if !reflect.DeepEqual(job.Spec, oldJob.Spec) { - logrus.Infof("Job %s spec changed, sending update event", job.Name) + logrus.Debugf("Job %s spec changed, sending update event", job.Name) return true } // Check if job failed for _, condition := range job.Status.Conditions { if condition.Type == batch_v1.JobFailed && condition.Status == api_v1.ConditionTrue { - logrus.Infof("Job %s failed, sending update event", job.Name) + logrus.Debugf("Job %s failed, sending update event", job.Name) return true } } - logrus.Infof("Filtering out Job update event - no spec change or failure detected") + logrus.Debugf("Filtering out Job update event - no spec change or failure detected") return false } @@ -174,35 +174,35 @@ func (f *Filter) shouldSendPodEvent(e event.Event) bool { // Check if spec changed if !reflect.DeepEqual(pod.Spec, oldPod.Spec) { - logrus.Infof("Pod %s spec changed, sending update event", pod.Name) + logrus.Debugf("Pod %s spec changed, sending update event", pod.Name) return true } // Check for container restarts if f.hasContainerRestarted(pod) { - logrus.Infof("Pod %s has container restarts, sending update event", pod.Name) + logrus.Debugf("Pod %s has container restarts, sending update event", pod.Name) return true } // Check for ImagePullBackOff if f.hasImagePullBackOff(pod) { - logrus.Infof("Pod %s has ImagePullBackOff, sending update event", pod.Name) + logrus.Debugf("Pod %s has ImagePullBackOff, sending update event", pod.Name) return true } // Check if pod is evicted if f.isPodEvicted(pod) { - logrus.Infof("Pod %s is evicted, sending update event", pod.Name) + logrus.Debugf("Pod %s is evicted, sending update event", pod.Name) return true } // Check for OOMKilled if f.hasOOMKilled(pod) { - logrus.Infof("Pod %s has OOMKilled container, sending update event", pod.Name) + logrus.Debugf("Pod %s has OOMKilled container, sending update event", pod.Name) return true } - logrus.Infof("Filtering out Pod update event - no significant changes detected") + logrus.Debugf("Filtering out Pod update event - no significant changes detected") return false } diff --git a/pkg/handlers/cloudevent/cloudevent.go b/pkg/handlers/cloudevent/cloudevent.go index 0383dd02..3efb691b 100644 --- a/pkg/handlers/cloudevent/cloudevent.go +++ b/pkg/handlers/cloudevent/cloudevent.go @@ -95,7 +95,7 @@ func (m *CloudEvent) Init(c *config.Config) error { func (m *CloudEvent) Handle(e event.Event) { // Apply filtering if enabled if !m.Filter.ShouldSendEvent(e) { - logrus.Infof("Event filtered out - Kind: %s, Reason: %s, Name: %s", e.Kind, e.Reason, e.Name) + logrus.Debugf("Event filtered out - Kind: %s, Reason: %s, Name: %s", e.Kind, e.Reason, e.Name) return } From f49b0388a7ba9fe57f308b40348b7ba83dcd651a Mon Sep 17 00:00:00 2001 From: Arik Alon Date: Wed, 15 Oct 2025 16:02:14 +0300 Subject: [PATCH 3/3] Include Normal Evicted events Add metrics --- docs/ADVANCED_FILTERING.md | 6 ++++-- pkg/filter/filter.go | 19 +++++++++++++++++++ pkg/filter/filter_test.go | 24 ++++++++++++++++++++++++ pkg/handlers/cloudevent/cloudevent.go | 17 +++++++++++++++++ 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/docs/ADVANCED_FILTERING.md b/docs/ADVANCED_FILTERING.md index 78793520..63f6fc5f 100644 --- a/docs/ADVANCED_FILTERING.md +++ b/docs/ADVANCED_FILTERING.md @@ -21,9 +21,11 @@ When advanced filtering is enabled, the following rules are applied: ### Event Resources (api/v1/Event and events.k8s.io/v1/Event) -- **Sent**: Only Warning events that are Created +- **Sent**: + - Warning events that are Created + - Any event with Reason "Evicted" (regardless of Type - Normal or Warning) - **Filtered**: - - Normal events (regardless of operation) + - Normal events (unless Reason is "Evicted") - Warning events with Update or Delete operations ### Job Resources diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 484e01ba..79233c34 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -87,6 +87,25 @@ func (f *Filter) shouldSendEventResource(e event.Event) bool { return false } + // Check the event reason - always send Evicted events regardless of type + isEvictedEvent := false + switch obj := e.Obj.(type) { + case *api_v1.Event: + if obj.Reason == "Evicted" { + isEvictedEvent = true + logrus.Debugf("Event resource with reason 'Evicted' will be sent regardless of type") + } + case *events_v1.Event: + if obj.Reason == "Evicted" { + isEvictedEvent = true + logrus.Debugf("Event resource with reason 'Evicted' will be sent regardless of type") + } + } + + if isEvictedEvent { + return true + } + // Check if it's a warning event switch obj := e.Obj.(type) { case *api_v1.Event: diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go index c503543e..35eb4dd4 100644 --- a/pkg/filter/filter_test.go +++ b/pkg/filter/filter_test.go @@ -116,6 +116,30 @@ func TestShouldSendEventResource(t *testing.T) { }, expected: true, }, + { + name: "Evicted Event Normal Type - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeNormal, + Reason: "Evicted", + }, + }, + expected: true, + }, + { + name: "Evicted EventsV1 Normal Type - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &events_v1.Event{ + Type: api_v1.EventTypeNormal, + Reason: "Evicted", + }, + }, + expected: true, + }, } for _, tt := range tests { diff --git a/pkg/handlers/cloudevent/cloudevent.go b/pkg/handlers/cloudevent/cloudevent.go index 3efb691b..95ba1b4e 100644 --- a/pkg/handlers/cloudevent/cloudevent.go +++ b/pkg/handlers/cloudevent/cloudevent.go @@ -29,6 +29,7 @@ import ( "github.com/bitnami-labs/kubewatch/config" "github.com/bitnami-labs/kubewatch/pkg/event" "github.com/bitnami-labs/kubewatch/pkg/filter" + "github.com/bitnami-labs/kubewatch/pkg/metrics" "k8s.io/apimachinery/pkg/runtime" ) @@ -99,6 +100,22 @@ func (m *CloudEvent) Handle(e event.Event) { return } + // Increment the sent metrics counter + // Map event.Reason to eventType for consistency with the total metrics + eventType := "unknown" + switch e.Reason { + case "Created": + eventType = "create" + case "Updated": + eventType = "update" + case "Deleted": + eventType = "delete" + } + + if metrics.EventsSentTotal != nil { + metrics.EventsSentTotal.WithLabelValues(e.Kind, eventType).Inc() + } + m.Counter++ // TODO: do we have to worry about threadsafety here? message := m.prepareMessage(e)