From bcc80bfae7326244c0b4281cfe21f59dce63cee2 Mon Sep 17 00:00:00 2001 From: majialong Date: Wed, 17 Sep 2025 01:42:34 +0800 Subject: [PATCH 1/2] KAFKA-19684: Move Gauge#value to MetricValueProvider --- .../apache/kafka/common/metrics/Gauge.java | 11 +------- .../kafka/common/metrics/KafkaMetric.java | 13 +++------- .../kafka/common/metrics/Measurable.java | 17 ++++++++++++- .../common/metrics/MetricValueProvider.java | 19 +++++++++----- .../kafka/common/metrics/KafkaMetricTest.java | 25 +++++++++++++++++++ 5 files changed, 59 insertions(+), 26 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java index d71bbd853db16..cad640eea2b9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java @@ -20,13 +20,4 @@ * A gauge metric is an instantaneous reading of a particular value. */ @FunctionalInterface -public interface Gauge extends MetricValueProvider { - - /** - * Returns the current value associated with this gauge. - * @param config The configuration for this metric - * @param now The POSIX time in milliseconds the measurement is being taken - */ - T value(MetricConfig config, long now); - -} +public interface Gauge extends MetricValueProvider { } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index 1d31855db53f2..a5f7914d560b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.utils.Time; +import java.util.Objects; + public final class KafkaMetric implements Metric { private final MetricName metricName; @@ -41,9 +43,7 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider va MetricConfig config, Time time) { this.metricName = metricName; this.lock = lock; - if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge)) - throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass()); - this.metricValueProvider = valueProvider; + this.metricValueProvider = Objects.requireNonNull(valueProvider, "valueProvider must not be null"); this.config = config; this.time = time; } @@ -75,12 +75,7 @@ public MetricName metricName() { public Object metricValue() { long now = time.milliseconds(); synchronized (this.lock) { - if (isMeasurable()) - return ((Measurable) metricValueProvider).measure(config, now); - else if (this.metricValueProvider instanceof Gauge) - return ((Gauge) metricValueProvider).value(config, now); - else - throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass()); + return metricValueProvider.value(config, now); } } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java index 866cabad685c9..58b9caa06ed9e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java @@ -22,11 +22,26 @@ public interface Measurable extends MetricValueProvider { /** - * Measure this quantity and return the result as a double + * Measure this quantity and return the result as a double. + * * @param config The configuration for this metric * @param now The POSIX time in milliseconds the measurement is being taken * @return The measured value */ double measure(MetricConfig config, long now); + /** + * Measure this quantity and return the result as a double. + * + * This default implementation delegates to {@link #measure(MetricConfig, long)}. + * + * @param config The configuration for this metric + * @param now The POSIX time in milliseconds the measurement is being taken + * @return The measured value as a {@link Double} + */ + @Override + default Double value(MetricConfig config, long now) { + return measure(config, now); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java index 68028e73a8f17..e4d751c2bb2a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricValueProvider.java @@ -19,10 +19,17 @@ /** * Super-interface for {@link Measurable} or {@link Gauge} that provides * metric values. - *

- * In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be - * moved to this interface with a default implementation in {@link Measurable} that returns - * {@link Measurable#measure(MetricConfig, long)}. - *

*/ -public interface MetricValueProvider { } +@FunctionalInterface +public interface MetricValueProvider { + + /** + * Returns the current value associated with this metric. + * + * @param config The configuration for this metric + * @param now The POSIX time in milliseconds the measurement is being taken + * @return the current metric value + */ + T value(MetricConfig config, long now); + +} \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricTest.java index ab6f349ac53be..e3a9fb345d795 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/KafkaMetricTest.java @@ -48,4 +48,29 @@ public void testIsMeasurableWithGaugeProvider() { assertThrows(IllegalStateException.class, metric::measurable); } + @Test + public void testMeasurableValueReturnsZeroWhenNotMeasurable() { + MockTime time = new MockTime(); + MetricConfig config = new MetricConfig(); + Gauge gauge = (c, now) -> 7; + + KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, gauge, config, time); + assertEquals(0.0d, metric.measurableValue(time.milliseconds()), 0.0d); + } + + @Test + public void testKafkaMetricAcceptsNonMeasurableNonGaugeProvider() { + MetricValueProvider provider = (config, now) -> "metric value provider"; + KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, provider, new MetricConfig(), new MockTime()); + + Object value = metric.metricValue(); + assertEquals("metric value provider", value); + } + + @Test + public void testConstructorWithNullProvider() { + assertThrows(NullPointerException.class, () -> + new KafkaMetric(new Object(), METRIC_NAME, null, new MetricConfig(), new MockTime()) + ); + } } From 377c43a054cbff15a18c4ee8c1e9ec7a44cbdff4 Mon Sep 17 00:00:00 2001 From: majialong Date: Thu, 18 Sep 2025 00:14:11 +0800 Subject: [PATCH 2/2] Update metricValue method doc. --- .../java/org/apache/kafka/common/metrics/KafkaMetric.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index a5f7914d560b7..a9203ead0a044 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -67,9 +67,9 @@ public MetricName metricName() { } /** - * Take the metric and return the value, which could be a {@link Measurable} or a {@link Gauge} + * Take the metric and return the value via {@link MetricValueProvider#value(MetricConfig, long)}. + * * @return Return the metric value - * @throws IllegalStateException if the underlying metric is not a {@link Measurable} or a {@link Gauge}. */ @Override public Object metricValue() {