Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package org.apache.flink.connector.kafka.source.reader;

import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.metrics.MetricGroup;

import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.KAFKA_CONSUMER_METRIC_GROUP;
import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.KAFKA_SOURCE_READER_METRIC_GROUP;

/**
* Handy metric reporter to hook into kafka metric reporters interface.
* This reports kafka metrics using flink metric group for the source operator
* and further scope KafkaSourceReader.KafkaConsumer.Native
* <br/>
* This metric reporter is enabled if `register.consumer.metrics` is true
* and if a metricGroup was found in config (kafka consumer props).
*/
public class FlinkKafkaMetricReporter implements MetricsReporter {
public static final String METRIC_GROUP_CONFIG = "flink_metric_group";
private static final String NATIVE_TAG = "Native";
private MetricGroup metricGroup;

@Override
public void init(List<KafkaMetric> metrics) {
if (metricGroup == null) {
return;
}
metrics.forEach(this::addGauge);
}

private void addGauge(KafkaMetric metric) {
if (metricGroup == null) {
return;
}
final var group = new AtomicReference<>(metricGroup);
final var name = metric.metricName();
name.tags().keySet().stream().sorted().forEach(
key -> group.set(group.get().addGroup(key, name.tags().get(key))));
group.get().gauge(name.name(), metric::metricValue);
}

@Override
public void metricChange(KafkaMetric kafkaMetric) {
if (metricGroup != null) {
this.addGauge(kafkaMetric);
}
}

@Override
public void metricRemoval(KafkaMetric kafkaMetric) {
// flink metric doesn't support removing metrics
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> config) {
Object value = config.get(KafkaSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS.key());
// kafka consumer metrics reporting is enabled by default
boolean enabled = true;
if (value instanceof Boolean) {
enabled = (Boolean) value;
} else if (value instanceof String) {
enabled = Boolean.parseBoolean((String) value);
}
Object metricGroup = config.get(METRIC_GROUP_CONFIG);
if (enabled && metricGroup instanceof MetricGroup) {
this.metricGroup = ((MetricGroup) metricGroup)
.addGroup(KAFKA_SOURCE_READER_METRIC_GROUP)
.addGroup(KAFKA_CONSUMER_METRIC_GROUP)
.addGroup(NATIVE_TAG);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public KafkaPartitionSplitReader(
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
consumerProps.put(FlinkKafkaMetricReporter.METRIC_GROUP_CONFIG, context.metricGroup());
consumerProps.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, FlinkKafkaMetricReporter.class.getName());
setConsumerClientRack(consumerProps, rackIdSupplier);
this.consumer = new KafkaConsumer<>(consumerProps);
this.stoppingOffsets = new HashMap<>();
Expand Down