Skip to content

Commit cd0f4c0

Browse files
authored
[kafka_consumer] Fix context count for max_partition_contexts (DataDog#21223)
* Fix context count # Conflicts: # kafka_consumer/tests/test_unit.py * Edit changelog. * Fix changelog PR number. * Run the formatter.
1 parent 0a5913c commit cd0f4c0

File tree

3 files changed

+16
-2
lines changed

3 files changed

+16
-2
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix undercount of contexts for the max_partition_contexts configuration option.

kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ def check(self, _):
6262
broker_timestamps = defaultdict(dict)
6363
cluster_id = ""
6464
persistent_cache_key = "broker_timestamps_"
65+
consumer_contexts_count = self.count_consumer_contexts(consumer_offsets)
6566
try:
66-
if len(consumer_offsets) < self._context_limit:
67+
if consumer_contexts_count < self._context_limit:
6768
# Fetch highwater offsets
6869
# Expected format: ({(topic, partition): offset}, cluster_id)
6970
highwater_offsets, cluster_id = self.get_highwater_offsets(consumer_offsets)
@@ -80,7 +81,7 @@ def check(self, _):
8081
self.client.close_admin_client()
8182
raise
8283

83-
total_contexts = sum(len(v) for v in consumer_offsets.values()) + len(highwater_offsets)
84+
total_contexts = consumer_contexts_count + len(highwater_offsets)
8485
self.log.debug(
8586
"Total contexts: %s, Consumer offsets: %s, Highwater offsets: %s",
8687
total_contexts,
@@ -108,6 +109,9 @@ def check(self, _):
108109
if self.config._close_admin_client:
109110
self.client.close_admin_client()
110111

112+
def count_consumer_contexts(self, consumer_offsets):
113+
return sum(len(offsets) for offsets in consumer_offsets.values())
114+
111115
def get_consumer_offsets(self):
112116
# {(consumer_group, topic, partition): offset}
113117
self.log.debug('Getting consumer offsets')

kafka_consumer/tests/test_unit.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,15 @@ def test_build_schema_none_handling():
10011001
build_protobuf_schema(None)
10021002

10031003

1004+
def test_count_consumer_contexts(check, kafka_instance):
1005+
kafka_consumer_check = check(kafka_instance)
1006+
consumer_offsets = {
1007+
'consumer_group1': {('topic1', 'partition0'): 1, ('topic1', 'partition1'): 2}, # 2 contexts
1008+
'consumer_group2': {('topic2', 'partition0'): 3}, # 1 context
1009+
}
1010+
assert kafka_consumer_check.count_consumer_contexts(consumer_offsets) == 3
1011+
1012+
10041013
def test_consumer_group_state_fetched_once_per_group(check, kafka_instance, dd_run_check, aggregator):
10051014
mock_client = seed_mock_client()
10061015
# Set up two partitions for same topic to check multiple contexts in same consumer group

0 commit comments

Comments
 (0)