Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,4 @@
* A gauge metric is an instantaneous reading of a particular value.
*/
@FunctionalInterface
public interface Gauge<T> extends MetricValueProvider<T> {

/**
* Returns the current value associated with this gauge.
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
*/
T value(MetricConfig config, long now);

}
public interface Gauge<T> extends MetricValueProvider<T> { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Gauge type seems redundant at this stage. Would it be worth filing a KIP to deprecate it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the Gauge type can improves code readability. Readers immediately understand that the monitoring metric represents an instantaneous value, and it can also be equated with the concept of Gauge in mainstream monitoring systems. Directly using the MetricValueProvider may be a bit too abstract.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another follow-up (KIP) could be removing addMetric(MetricName metricName, Measurable measurable), which would let us drop the explicit type from the lambda
before

metrics.addMetric(metricName(metrics, "version", Map.of()), (Gauge<String>) (config, now) -> appInfo.getVersion());

after

metrics.addMetric(metricName(metrics, "version", Map.of()),  (config, now) -> appInfo.getVersion());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good suggestion, I've created a JIRA to track this issue:https://issues.apache.org/jira/browse/KAFKA-19729

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Time;

import java.util.Objects;

public final class KafkaMetric implements Metric {

private final MetricName metricName;
Expand All @@ -41,9 +43,7 @@ public KafkaMetric(Object lock, MetricName metricName, MetricValueProvider<?> va
MetricConfig config, Time time) {
this.metricName = metricName;
this.lock = lock;
if (!(valueProvider instanceof Measurable) && !(valueProvider instanceof Gauge))
throw new IllegalArgumentException("Unsupported metric value provider of class " + valueProvider.getClass());
this.metricValueProvider = valueProvider;
this.metricValueProvider = Objects.requireNonNull(valueProvider, "valueProvider must not be null");
this.config = config;
this.time = time;
}
Expand All @@ -67,20 +67,15 @@ public MetricName metricName() {
}

/**
* Take the metric and return the value, which could be a {@link Measurable} or a {@link Gauge}
* Take the metric and return the value via {@link MetricValueProvider#value(MetricConfig, long)}.
*
* @return Return the metric value
* @throws IllegalStateException if the underlying metric is not a {@link Measurable} or a {@link Gauge}.
*/
@Override
public Object metricValue() {
long now = time.milliseconds();
synchronized (this.lock) {
if (isMeasurable())
return ((Measurable) metricValueProvider).measure(config, now);
else if (this.metricValueProvider instanceof Gauge)
return ((Gauge<?>) metricValueProvider).value(config, now);
else
throw new IllegalStateException("Not a valid metric: " + this.metricValueProvider.getClass());
return metricValueProvider.value(config, now);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the docs so they match these changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder, I will update this doc.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,26 @@
public interface Measurable extends MetricValueProvider<Double> {

/**
* Measure this quantity and return the result as a double
* Measure this quantity and return the result as a double.
*
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value
*/
double measure(MetricConfig config, long now);

/**
* Measure this quantity and return the result as a double.
*
* This default implementation delegates to {@link #measure(MetricConfig, long)}.
*
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return The measured value as a {@link Double}
*/
@Override
default Double value(MetricConfig config, long now) {
return measure(config, now);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@
/**
* Super-interface for {@link Measurable} or {@link Gauge} that provides
* metric values.
* <p>
* In the future for Java8 and above, {@link Gauge#value(MetricConfig, long)} will be
* moved to this interface with a default implementation in {@link Measurable} that returns
* {@link Measurable#measure(MetricConfig, long)}.
* </p>
*/
public interface MetricValueProvider<T> { }
@FunctionalInterface
public interface MetricValueProvider<T> {

/**
* Returns the current value associated with this metric.
*
* @param config The configuration for this metric
* @param now The POSIX time in milliseconds the measurement is being taken
* @return the current metric value
*/
T value(MetricConfig config, long now);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,29 @@ public void testIsMeasurableWithGaugeProvider() {
assertThrows(IllegalStateException.class, metric::measurable);
}

@Test
public void testMeasurableValueReturnsZeroWhenNotMeasurable() {
MockTime time = new MockTime();
MetricConfig config = new MetricConfig();
Gauge<Integer> gauge = (c, now) -> 7;

KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, gauge, config, time);
assertEquals(0.0d, metric.measurableValue(time.milliseconds()), 0.0d);
}

@Test
public void testKafkaMetricAcceptsNonMeasurableNonGaugeProvider() {
MetricValueProvider<String> provider = (config, now) -> "metric value provider";
KafkaMetric metric = new KafkaMetric(new Object(), METRIC_NAME, provider, new MetricConfig(), new MockTime());

Object value = metric.metricValue();
assertEquals("metric value provider", value);
}

@Test
public void testConstructorWithNullProvider() {
assertThrows(NullPointerException.class, () ->
new KafkaMetric(new Object(), METRIC_NAME, null, new MetricConfig(), new MockTime())
);
}
}