NETOBSERV-595 add kafka compression option#2593
NETOBSERV-595 add kafka compression option#2593jpinsonneau wants to merge 1 commit intonetobserv:mainfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
New images: quay.io/netobserv/network-observability-operator:bc36f6d
quay.io/netobserv/network-observability-operator-bundle:v0.0.0-sha-bc36f6d
quay.io/netobserv/network-observability-operator-catalog:v0.0.0-sha-bc36f6dThey will expire in two weeks. To deploy this build: # Direct deployment, from operator repo
IMAGE=quay.io/netobserv/network-observability-operator:bc36f6d make deploy
# Or using operator-sdk
operator-sdk run bundle quay.io/netobserv/network-observability-operator-bundle:v0.0.0-sha-bc36f6dOr as a Catalog Source: apiVersion: operators.coreos.com/v1alpha1
kind: CatalogSource
metadata:
name: netobserv-dev
namespace: openshift-marketplace
spec:
sourceType: grpc
image: quay.io/netobserv/network-observability-operator-catalog:v0.0.0-sha-bc36f6d
displayName: NetObserv development catalog
publisher: Me
updateStrategy:
registryPoll:
interval: 1m |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2593 +/- ##
==========================================
- Coverage 72.39% 72.22% -0.17%
==========================================
Files 105 105
Lines 10851 10853 +2
==========================================
- Hits 7856 7839 -17
- Misses 2518 2536 +18
- Partials 477 478 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughThis PR adds support for configurable Kafka message compression in the FlowCollector operator. A new optional Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
internal/controller/ebpf/agent_controller.go (1)
452-461: Add a regression test forKAFKA_COMPRESSIONenv propagation.This mapping is critical to runtime behavior; a focused controller test asserting the env var value (
lz4default + explicit override) would help prevent regressions.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/controller/ebpf/agent_controller.go` around lines 452 - 461, The controller currently sets envKafkaCompression from coll.Spec.Kafka.Compression when coll.Spec.UseKafka() is true; add a unit/regression test targeting the EBPF agent controller (e.g., the function that builds pod envs in agent_controller.go or the method that calls append(config, ...)) that verifies envKafkaCompression is present and set to the default "lz4" when unspecified and to an explicit value when provided; in the test construct a Collection/Spec with UseKafka() true, assert the resulting env list contains corev1.EnvVar{Name: envKafkaCompression, Value: "<expected>"} for both default and override cases, and include a case covering coll.Spec.Kafka.Compression being empty to ensure defaulting behavior is enforced.docs/KafkaCompression.md (1)
1-52: Consider adding operational guidance for configuration changes.The documentation is comprehensive and well-written. One optional enhancement: briefly mention what happens when operators change the
compressionsetting on an existing deployment (e.g., whether it requires a rolling restart of the eBPF agent DaemonSet, whether messages already in Kafka are affected, etc.). This would help operators plan configuration changes in production environments.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/KafkaCompression.md` around lines 1 - 52, Add a short "Operational changes" note explaining that changing spec.kafka.compression or spec.exporters[].kafka.compression only affects producers (the eBPF agent) and requires a rolling restart of the eBPF agent DaemonSet to pick up the new codec, that existing messages already in Kafka are stored with their original codec and are not rewritten, consumers (flowlogs-pipeline) automatically handle decompression so no consumer config change is needed, and Kafka brokers need no restart; also mention that spec.agent.ebpf.kafkaBatchSize and batch boundaries determine how compression is applied and that operators should plan restarts to avoid simultaneous DaemonSet pod restarts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@docs/KafkaCompression.md`:
- Around line 1-52: Add a short "Operational changes" note explaining that
changing spec.kafka.compression or spec.exporters[].kafka.compression only
affects producers (the eBPF agent) and requires a rolling restart of the eBPF
agent DaemonSet to pick up the new codec, that existing messages already in
Kafka are stored with their original codec and are not rewritten, consumers
(flowlogs-pipeline) automatically handle decompression so no consumer config
change is needed, and Kafka brokers need no restart; also mention that
spec.agent.ebpf.kafkaBatchSize and batch boundaries determine how compression is
applied and that operators should plan restarts to avoid simultaneous DaemonSet
pod restarts.
In `@internal/controller/ebpf/agent_controller.go`:
- Around line 452-461: The controller currently sets envKafkaCompression from
coll.Spec.Kafka.Compression when coll.Spec.UseKafka() is true; add a
unit/regression test targeting the EBPF agent controller (e.g., the function
that builds pod envs in agent_controller.go or the method that calls
append(config, ...)) that verifies envKafkaCompression is present and set to the
default "lz4" when unspecified and to an explicit value when provided; in the
test construct a Collection/Spec with UseKafka() true, assert the resulting env
list contains corev1.EnvVar{Name: envKafkaCompression, Value: "<expected>"} for
both default and override cases, and include a case covering
coll.Spec.Kafka.Compression being empty to ensure defaulting behavior is
enforced.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 482a435e-e0d7-4669-9e98-13cad9b41b01
📒 Files selected for processing (11)
api/flowcollector/v1beta2/flowcollector_types.gobundle/manifests/flows.netobserv.io_flowcollectors.yamlbundle/manifests/netobserv-operator.clusterserviceversion.yamlconfig/crd/bases/flows.netobserv.io_flowcollectors.yamldocs/FlowCollector.mddocs/KafkaCompression.mdhelm/crds/flows.netobserv.io_flowcollectors.yamlinternal/controller/ebpf/agent_controller.gointernal/controller/flowcollector_controller_iso_test.gointernal/controller/flp/flp_pipeline_builder.govendor/github.com/netobserv/flowlogs-pipeline/pkg/api/encode_kafka.go
leandroberetta
left a comment
There was a problem hiding this comment.
lgtm, thanks @jpinsonneau!
Description
lz4as it's the best tradeoffObservations
On a small 2 nodes clusters, I see a 25% / 600MB gain on kafka overall memory usage while the amount of bytes transfered between kafka pool and eBPF / FLP is reduced by 50% with a similar CPU consumption in both scenarios.
However, eBPF agents and FLPs CPU is slightly increased with compression enabled and memory usage is 50% higher (250MB).
On my scenario, I'm still gaining 350MB in memory enabling compression.
Assisted-by:
claude-4.6-opus-highfor documentationDependencies
netobserv/flowlogs-pipeline#1220
Note: eBPF agent already handle kafkaCompression https://github.com/netobserv/netobserv-ebpf-agent/blob/release-1.11/pkg/config/config.go#L200-L202
Checklist
Summary by CodeRabbit
New Features
none,gzip,snappy,lz4,zstd) for Kafka brokers, withlz4as the default.Documentation