diff --git a/docs/ADVANCED_FILTERING.md b/docs/ADVANCED_FILTERING.md new file mode 100644 index 00000000..63f6fc5f --- /dev/null +++ b/docs/ADVANCED_FILTERING.md @@ -0,0 +1,162 @@ +# Advanced Filtering for Kubewatch + +## Overview + +The advanced filtering feature allows Kubewatch to filter out irrelevant Kubernetes events before sending them to Robusta, significantly reducing the performance impact on the Robusta system. This feature is particularly useful when monitoring large Kubernetes clusters where many events are generated but only a subset are actually relevant. + +## Configuration + +The filtering mechanism is controlled via the environment variable `ADVANCED_FILTERS`: + +```bash +export ADVANCED_FILTERS=true # Enable advanced filtering +export ADVANCED_FILTERS=false # Disable advanced filtering (default) +``` + +When not set, the feature defaults to `false` (disabled), maintaining backward compatibility. + +## Filtering Rules + +When advanced filtering is enabled, the following rules are applied: + +### Event Resources (api/v1/Event and events.k8s.io/v1/Event) + +- **Sent**: + - Warning events that are Created + - Any event with Reason "Evicted" (regardless of Type - Normal or Warning) +- **Filtered**: + - Normal events (unless Reason is "Evicted") + - Warning events with Update or Delete operations + +### Job Resources + +- **Always Sent**: + - Create events + - Delete events + +- **Conditionally Sent** (Update events): + - When the Job spec changes + - When the Job fails (status condition contains "Failed") + +- **Filtered**: Update events without spec changes or failures + +### Pod Resources + +- **Always Sent**: + - Create events + - Delete events + +- **Conditionally Sent** (Update events): + - When the Pod spec changes + - When any container (including init containers) has restartCount > 0 + - When any container is waiting with reason "ImagePullBackOff" + - When the Pod is evicted + - When any container is terminated with reason "OOMKilled" + +- **Filtered**: Update events without any of the above conditions + +### All Other Resources + +All events for resources not explicitly mentioned above (e.g., Deployments, Services, ConfigMaps, etc.) are sent without filtering. + +## Implementation Details + +The filtering logic is implemented in the `pkg/filter` package and integrated into the CloudEvent handler. The filter evaluates each event before it's sent to Robusta, checking the resource type and event characteristics against the defined rules. + +### Key Components: + +1. **Filter Package** (`pkg/filter/filter.go`): Contains the core filtering logic +2. **CloudEvent Handler Integration**: The filter is initialized and used in the CloudEvent handler +3. **Environment Variable**: `ADVANCED_FILTERS` controls whether filtering is active + +## Usage Example + +### Kubernetes Deployment + +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kubewatch +spec: + template: + spec: + containers: + - name: kubewatch + image: kubewatch:latest + env: + - name: ADVANCED_FILTERS + value: "true" + - name: KW_CLOUDEVENT_URL + value: "https://your-robusta-endpoint" +``` + +### Docker Run + +```bash +docker run -d \ + -e ADVANCED_FILTERS=true \ + -e KW_CLOUDEVENT_URL=https://your-robusta-endpoint \ + kubewatch:latest +``` + +## Testing + +To run the filter tests: + +```bash +cd pkg/filter +go test -v +``` + +## Performance Impact + +With advanced filtering enabled, you can expect: + +- **Reduced Network Traffic**: Fewer events sent to Robusta +- **Lower CPU/Memory Usage**: Robusta processes fewer irrelevant events +- **Improved Signal-to-Noise Ratio**: Only meaningful events are forwarded + +## Debugging + +When debugging is enabled (log level set to debug), filtered events are logged with details about why they were filtered: + +``` +DEBU[0001] Filtering out Event resource - type: Normal (only Warning events are sent) +DEBU[0002] Filtering out Pod update event - no significant changes detected +``` + +## Migration Guide + +1. **Test in Development**: Enable filtering in a development environment first +2. **Monitor Metrics**: Compare the number of events before and after enabling filtering +3. **Gradual Rollout**: Enable filtering on a subset of Kubewatch instances before full deployment +4. **Verify Coverage**: Ensure important events are still being captured + +## Troubleshooting + +### Events Not Being Received + +If expected events are not reaching Robusta after enabling filtering: + +1. Check the ADVANCED_FILTERS environment variable is set correctly +2. Review the filtering rules to ensure your use case is covered +3. Enable debug logging to see which events are being filtered +4. Temporarily disable filtering to confirm it's the cause + +### All Events Still Being Sent + +If filtering appears to have no effect: + +1. Verify ADVANCED_FILTERS is set to "true" (string value) +2. Check Kubewatch logs for "Advanced filtering is ENABLED" message +3. Ensure you're using the CloudEvent handler (filtering only works with CloudEvent handler) + +## Future Enhancements + +Potential improvements to the filtering system: + +- Configurable filtering rules via configuration file +- Per-resource-type filtering toggles +- Custom filtering expressions +- Filtering statistics and metrics \ No newline at end of file diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go new file mode 100644 index 00000000..79233c34 --- /dev/null +++ b/pkg/filter/filter.go @@ -0,0 +1,321 @@ +/* +Copyright 2024 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "os" + "reflect" + "strconv" + "strings" + + "github.com/bitnami-labs/kubewatch/pkg/event" + "github.com/sirupsen/logrus" + + batch_v1 "k8s.io/api/batch/v1" + api_v1 "k8s.io/api/core/v1" + events_v1 "k8s.io/api/events/v1" +) + +// Filter is the main filter struct +type Filter struct { + enabled bool +} + +// NewFilter creates a new filter instance +func NewFilter() *Filter { + enabled := false + if envVal := os.Getenv("ADVANCED_FILTERS"); envVal != "" { + parsedVal, err := strconv.ParseBool(envVal) + if err == nil { + enabled = parsedVal + } else { + logrus.Warnf("Invalid ADVANCED_FILTERS value: %s, defaulting to false", envVal) + } + } + + if enabled { + logrus.Info("Advanced filtering is ENABLED") + } else { + logrus.Info("Advanced filtering is DISABLED") + } + + return &Filter{ + enabled: enabled, + } +} + +// ShouldSendEvent determines if an event should be sent to Robusta +func (f *Filter) ShouldSendEvent(e event.Event) bool { + // If filtering is disabled, send all events + if !f.enabled { + return true + } + + // Apply filtering rules based on resource kind + switch e.Kind { + case "Event": + return f.shouldSendEventResource(e) + case "Job": + return f.shouldSendJobEvent(e) + case "Pod": + return f.shouldSendPodEvent(e) + default: + // For all other resources, send the event + return true + } +} + +// shouldSendEventResource filters Kubernetes Event resources +func (f *Filter) shouldSendEventResource(e event.Event) bool { + // For Event resources, only send warning events and only create events + if e.Reason != "Created" { + logrus.Debugf("Filtering out Event resource - reason: %s (only 'Created' events are sent)", e.Reason) + return false + } + + // Check the event reason - always send Evicted events regardless of type + isEvictedEvent := false + switch obj := e.Obj.(type) { + case *api_v1.Event: + if obj.Reason == "Evicted" { + isEvictedEvent = true + logrus.Debugf("Event resource with reason 'Evicted' will be sent regardless of type") + } + case *events_v1.Event: + if obj.Reason == "Evicted" { + isEvictedEvent = true + logrus.Debugf("Event resource with reason 'Evicted' will be sent regardless of type") + } + } + + if isEvictedEvent { + return true + } + + // Check if it's a warning event + switch obj := e.Obj.(type) { + case *api_v1.Event: + if obj.Type != api_v1.EventTypeWarning { + logrus.Debugf("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + return false + } + case *events_v1.Event: + if obj.Type != api_v1.EventTypeWarning { + logrus.Debugf("Filtering out Event resource - type: %s (only Warning events are sent)", obj.Type) + return false + } + default: + // If we can't determine the type, send it to be safe + logrus.Warnf("Unable to determine Event type for filtering, sending event") + return true + } + + return true +} + +// shouldSendJobEvent filters Job events +func (f *Filter) shouldSendJobEvent(e event.Event) bool { + // Always send Create and Delete events + if e.Reason == "Created" || e.Reason == "Deleted" { + return true + } + + // For Update events, check if spec changed or job failed + if e.Reason == "Updated" { + job, ok := e.Obj.(*batch_v1.Job) + if !ok { + logrus.Warnf("Unable to cast Job object for filtering, sending event") + return true + } + + oldJob, ok := e.OldObj.(*batch_v1.Job) + if !ok { + // If we don't have the old object, send the event to be safe + return true + } + + // Check if spec changed + if !reflect.DeepEqual(job.Spec, oldJob.Spec) { + logrus.Debugf("Job %s spec changed, sending update event", job.Name) + return true + } + + // Check if job failed + for _, condition := range job.Status.Conditions { + if condition.Type == batch_v1.JobFailed && condition.Status == api_v1.ConditionTrue { + logrus.Debugf("Job %s failed, sending update event", job.Name) + return true + } + } + + logrus.Debugf("Filtering out Job update event - no spec change or failure detected") + return false + } + + // For other event types, don't send + return false +} + +// shouldSendPodEvent filters Pod events +func (f *Filter) shouldSendPodEvent(e event.Event) bool { + // Always send Create and Delete events + if e.Reason == "Created" || e.Reason == "Deleted" { + return true + } + + // For Update events, apply specific filters + if e.Reason == "Updated" { + pod, ok := e.Obj.(*api_v1.Pod) + if !ok { + logrus.Warnf("Unable to cast Pod object for filtering, sending event") + return true + } + + oldPod, ok := e.OldObj.(*api_v1.Pod) + if !ok { + // If we don't have the old object, send the event to be safe + return true + } + + // Check if spec changed + if !reflect.DeepEqual(pod.Spec, oldPod.Spec) { + logrus.Debugf("Pod %s spec changed, sending update event", pod.Name) + return true + } + + // Check for container restarts + if f.hasContainerRestarted(pod) { + logrus.Debugf("Pod %s has container restarts, sending update event", pod.Name) + return true + } + + // Check for ImagePullBackOff + if f.hasImagePullBackOff(pod) { + logrus.Debugf("Pod %s has ImagePullBackOff, sending update event", pod.Name) + return true + } + + // Check if pod is evicted + if f.isPodEvicted(pod) { + logrus.Debugf("Pod %s is evicted, sending update event", pod.Name) + return true + } + + // Check for OOMKilled + if f.hasOOMKilled(pod) { + logrus.Debugf("Pod %s has OOMKilled container, sending update event", pod.Name) + return true + } + + logrus.Debugf("Filtering out Pod update event - no significant changes detected") + return false + } + + // For other event types, don't send + return false +} + +// hasContainerRestarted checks if any container (including init containers) has restarted +func (f *Filter) hasContainerRestarted(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if containerStatus.RestartCount > 0 { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.RestartCount > 0 { + return true + } + } + + return false +} + +// hasImagePullBackOff checks if any container is waiting due to ImagePullBackOff +func (f *Filter) hasImagePullBackOff(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if containerStatus.State.Waiting != nil && + containerStatus.State.Waiting.Reason == "ImagePullBackOff" { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if containerStatus.State.Waiting != nil && + containerStatus.State.Waiting.Reason == "ImagePullBackOff" { + return true + } + } + + return false +} + +// isPodEvicted checks if the pod has been evicted +func (f *Filter) isPodEvicted(pod *api_v1.Pod) bool { + // Check pod phase and reason + if pod.Status.Phase == api_v1.PodFailed && pod.Status.Reason == "Evicted" { + return true + } + + // Alternative check: look for eviction in pod status message + if strings.Contains(pod.Status.Message, "evicted") { + return true + } + + return false +} + +// hasOOMKilled checks if any container was OOMKilled +func (f *Filter) hasOOMKilled(pod *api_v1.Pod) bool { + // Check init containers + for _, containerStatus := range pod.Status.InitContainerStatuses { + if f.isContainerOOMKilled(containerStatus) { + return true + } + } + + // Check regular containers + for _, containerStatus := range pod.Status.ContainerStatuses { + if f.isContainerOOMKilled(containerStatus) { + return true + } + } + + return false +} + +// isContainerOOMKilled checks if a specific container status indicates OOMKilled +func (f *Filter) isContainerOOMKilled(status api_v1.ContainerStatus) bool { + // Check current state + if status.State.Terminated != nil && + status.State.Terminated.Reason == "OOMKilled" { + return true + } + + // Check last state + if status.LastTerminationState.Terminated != nil && + status.LastTerminationState.Terminated.Reason == "OOMKilled" { + return true + } + + return false +} diff --git a/pkg/filter/filter_test.go b/pkg/filter/filter_test.go new file mode 100644 index 00000000..35eb4dd4 --- /dev/null +++ b/pkg/filter/filter_test.go @@ -0,0 +1,465 @@ +/* +Copyright 2024 + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "os" + "testing" + + "github.com/bitnami-labs/kubewatch/pkg/event" + batch_v1 "k8s.io/api/batch/v1" + api_v1 "k8s.io/api/core/v1" + events_v1 "k8s.io/api/events/v1" +) + +func TestNewFilter(t *testing.T) { + tests := []struct { + name string + envValue string + expected bool + }{ + {"Disabled by default", "", false}, + {"Enabled with true", "true", true}, + {"Disabled with false", "false", false}, + {"Invalid value defaults to false", "invalid", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.envValue != "" { + os.Setenv("ADVANCED_FILTERS", tt.envValue) + defer os.Unsetenv("ADVANCED_FILTERS") + } + + filter := NewFilter() + if filter.enabled != tt.expected { + t.Errorf("Expected enabled=%v, got %v", tt.expected, filter.enabled) + } + }) + } +} + +func TestShouldSendEventResource(t *testing.T) { + filter := &Filter{enabled: true} + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Warning Event Created - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: true, + }, + { + name: "Normal Event Created - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeNormal, + }, + }, + expected: false, + }, + { + name: "Warning Event Updated - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Updated", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: false, + }, + { + name: "Warning Event Deleted - Should Filter", + event: event.Event{ + Kind: "Event", + Reason: "Deleted", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: false, + }, + { + name: "Warning EventsV1 Created - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &events_v1.Event{ + Type: api_v1.EventTypeWarning, + }, + }, + expected: true, + }, + { + name: "Evicted Event Normal Type - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &api_v1.Event{ + Type: api_v1.EventTypeNormal, + Reason: "Evicted", + }, + }, + expected: true, + }, + { + name: "Evicted EventsV1 Normal Type - Should Send", + event: event.Event{ + Kind: "Event", + Reason: "Created", + Obj: &events_v1.Event{ + Type: api_v1.EventTypeNormal, + Reason: "Evicted", + }, + }, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendJobEvent(t *testing.T) { + filter := &Filter{enabled: true} + + jobSpec1 := batch_v1.JobSpec{ + Parallelism: intPtr(1), + } + jobSpec2 := batch_v1.JobSpec{ + Parallelism: intPtr(2), + } + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Job Created - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Created", + Obj: &batch_v1.Job{}, + }, + expected: true, + }, + { + name: "Job Deleted - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Deleted", + Obj: &batch_v1.Job{}, + }, + expected: true, + }, + { + name: "Job Updated with Spec Change - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec2, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: true, + }, + { + name: "Job Updated with Failure - Should Send", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec1, + Status: batch_v1.JobStatus{ + Conditions: []batch_v1.JobCondition{ + { + Type: batch_v1.JobFailed, + Status: api_v1.ConditionTrue, + }, + }, + }, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: true, + }, + { + name: "Job Updated without Changes - Should Filter", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + Obj: &batch_v1.Job{ + Spec: jobSpec1, + }, + OldObj: &batch_v1.Job{ + Spec: jobSpec1, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendPodEvent(t *testing.T) { + filter := &Filter{enabled: true} + + podSpec1 := api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyAlways, + } + podSpec2 := api_v1.PodSpec{ + RestartPolicy: api_v1.RestartPolicyNever, + } + + tests := []struct { + name string + event event.Event + expected bool + }{ + { + name: "Pod Created - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Created", + Obj: &api_v1.Pod{}, + }, + expected: true, + }, + { + name: "Pod Deleted - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Deleted", + Obj: &api_v1.Pod{}, + }, + expected: true, + }, + { + name: "Pod Updated with Spec Change - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec2, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated with Container Restart - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + RestartCount: 1, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated with ImagePullBackOff - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + State: api_v1.ContainerState{ + Waiting: &api_v1.ContainerStateWaiting{ + Reason: "ImagePullBackOff", + }, + }, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Evicted - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + Phase: api_v1.PodFailed, + Reason: "Evicted", + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod with OOMKilled Container - Should Send", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{ + { + State: api_v1.ContainerState{ + Terminated: &api_v1.ContainerStateTerminated{ + Reason: "OOMKilled", + }, + }, + }, + }, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: true, + }, + { + name: "Pod Updated without Significant Changes - Should Filter", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + Obj: &api_v1.Pod{ + Spec: podSpec1, + Status: api_v1.PodStatus{ + Phase: api_v1.PodRunning, + }, + }, + OldObj: &api_v1.Pod{ + Spec: podSpec1, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if result != tt.expected { + t.Errorf("Expected %v, got %v", tt.expected, result) + } + }) + } +} + +func TestShouldSendEventWithFilterDisabled(t *testing.T) { + filter := &Filter{enabled: false} + + tests := []struct { + name string + event event.Event + }{ + { + name: "Any Event", + event: event.Event{ + Kind: "Event", + Reason: "Updated", + }, + }, + { + name: "Any Pod", + event: event.Event{ + Kind: "Pod", + Reason: "Updated", + }, + }, + { + name: "Any Job", + event: event.Event{ + Kind: "Job", + Reason: "Updated", + }, + }, + { + name: "Any Other Resource", + event: event.Event{ + Kind: "Deployment", + Reason: "Updated", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filter.ShouldSendEvent(tt.event) + if !result { + t.Errorf("Expected all events to be sent when filter is disabled") + } + }) + } +} + +// Helper function to create int pointers +func intPtr(i int32) *int32 { + return &i +} diff --git a/pkg/handlers/cloudevent/cloudevent.go b/pkg/handlers/cloudevent/cloudevent.go index f05dd06b..95ba1b4e 100644 --- a/pkg/handlers/cloudevent/cloudevent.go +++ b/pkg/handlers/cloudevent/cloudevent.go @@ -28,6 +28,8 @@ import ( "github.com/bitnami-labs/kubewatch/config" "github.com/bitnami-labs/kubewatch/pkg/event" + "github.com/bitnami-labs/kubewatch/pkg/filter" + "github.com/bitnami-labs/kubewatch/pkg/metrics" "k8s.io/apimachinery/pkg/runtime" ) @@ -49,6 +51,7 @@ type CloudEvent struct { Url string StartTime uint64 Counter uint64 + Filter *filter.Filter } type CloudEventMessage struct { @@ -77,6 +80,7 @@ func (m *CloudEvent) Init(c *config.Config) error { m.Url = c.Handler.CloudEvent.Url m.StartTime = uint64(time.Now().Unix()) m.Counter = 0 + m.Filter = filter.NewFilter() if m.Url == "" { m.Url = os.Getenv("KW_CLOUDEVENT_URL") @@ -90,6 +94,28 @@ func (m *CloudEvent) Init(c *config.Config) error { } func (m *CloudEvent) Handle(e event.Event) { + // Apply filtering if enabled + if !m.Filter.ShouldSendEvent(e) { + logrus.Debugf("Event filtered out - Kind: %s, Reason: %s, Name: %s", e.Kind, e.Reason, e.Name) + return + } + + // Increment the sent metrics counter + // Map event.Reason to eventType for consistency with the total metrics + eventType := "unknown" + switch e.Reason { + case "Created": + eventType = "create" + case "Updated": + eventType = "update" + case "Deleted": + eventType = "delete" + } + + if metrics.EventsSentTotal != nil { + metrics.EventsSentTotal.WithLabelValues(e.Kind, eventType).Inc() + } + m.Counter++ // TODO: do we have to worry about threadsafety here? message := m.prepareMessage(e)