From 453762fac38560f068b09fbea1c049abba0c2cfe Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 21 Dec 2023 04:38:51 +0800 Subject: [PATCH 01/13] PIP-223 --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../pulsar/broker/ServiceConfiguration.java | 5 + .../broker/web/RestEndpointMetricsFilter.java | 117 ++++++++++++++++++ .../apache/pulsar/broker/web/WebService.java | 3 + .../stats/BrokerRestEndpointMetricsTest.java | 95 ++++++++++++++ 6 files changed, 226 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java diff --git a/conf/broker.conf b/conf/broker.conf index 82dd5640740c0..f528ad19eab02 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1533,6 +1533,9 @@ authenticateMetricsEndpoint=false # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable consumer level metrics. default is false exposeConsumerLevelMetricsInPrometheus=false diff --git a/conf/standalone.conf b/conf/standalone.conf index cf13f12c8fe6f..3e648b9d5ec9e 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -968,6 +968,9 @@ webSocketMaxTextFrameSize=1048576 # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Time in milliseconds that metrics endpoint would time out. Default is 30s. # Increase it if there are a lot of topics to expose topic-level metrics. # Set it to 0 to disable timeout. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4f2d56fc07ea7..1c00067471abf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2962,6 +2962,11 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private boolean exposeBundlesMetricsInPrometheus = false; + @FieldContext( + category = CATEGORY_METRICS, + doc = "Enable expose per rest endpoint metrics of the broker.") + private boolean exposePerRestEndpointMetricsInPrometheus = false; + /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java new file mode 100644 index 0000000000000..c16cc96ba527c --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.io.IOException; +import java.util.Stack; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; +import org.jetbrains.annotations.NotNull; + +public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { + private static final LoadingCache CACHE = CacheBuilder + .newBuilder() + .maximumSize(100) + .build(new CacheLoader<>() { + @Override + public @NotNull String load(@NotNull ResourceMethod method) throws Exception { + return getRestPath(method); + } + }); + + private static final Histogram LATENCY = Histogram + .build("pulsar_broker_rest_endpoint_latency", "-") + .unit("ms") + .labelNames("path", "method") + .buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D) + .register(); + private static final Counter FAILED = Counter + .build("pulsar_broker_rest_endpoint_failed", "-") + .labelNames("path", "method", "code") + .register(); + + private static final String REQUEST_START_TIME = "requestStartTime"; + + @Override + public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + String path; + try { + UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); + ResourceMethod rm = info.getMatchedResourceMethod(); + path = CACHE.get(rm); + } catch (Throwable ex) { + path = "UNKNOWN"; + } + + String method = req.getMethod(); + Response.StatusType status = resp.getStatusInfo(); + if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) { + long start = req.getProperty(REQUEST_START_TIME) == null + ? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME); + LATENCY.labels(path, method).observe(System.currentTimeMillis() - start); + } else { + FAILED.labels(path, method, String.valueOf(status.getStatusCode())).inc(); + } + } + + @Override + public void filter(ContainerRequestContext req) throws IOException { + // Set the request start time into properties. + req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); + } + + private static String getRestPath(ResourceMethod method) { + try { + StringBuilder fullPath = new StringBuilder(); + Stack pathStack = new Stack<>(); + Resource parent = method.getParent(); + + while (true) { + String path = parent.getPath(); + parent = parent.getParent(); + if (parent == null) { + if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) { + pathStack.push("/"); + } + pathStack.push(path); + break; + } + pathStack.push(path); + + } + while (!pathStack.isEmpty()) { + fullPath.append(pathStack.pop().replace("{", ":").replace("}", "")); + } + return fullPath.toString(); + } catch (Exception ex) { + return "UNKNOWN"; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 902593b7bf678..05d8f82ddb5be 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -192,6 +192,9 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); + if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { + config.register(RestEndpointMetricsFilter.class); + } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java new file mode 100644 index 0000000000000..b14044420ee55 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRestEndpointMetricsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setExposePerRestEndpointMetricsInPrometheus(true); + baseSetup(); + } + + @BeforeMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testMetrics() throws Exception { + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + + String metricsStr = output.toString(StandardCharsets.UTF_8); + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); + Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); + + Assert.assertTrue(latency.size() > 0); + Assert.assertTrue(failed.size() > 0); + + for (PrometheusMetricsTest.Metric m : latency) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + } + + for (PrometheusMetricsTest.Metric m : failed) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + Assert.assertNotNull(m.tags.get("code")); + } + } +} \ No newline at end of file From e139404a60c98d76b675ec4a802962e0b3910a52 Mon Sep 17 00:00:00 2001 From: DaoJun Date: Thu, 29 Feb 2024 17:06:56 +0800 Subject: [PATCH 02/13] fix code conflicts --- .../broker/stats/BrokerRestEndpointMetricsTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index b14044420ee55..681ebc3f51318 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.UUID; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; @@ -71,21 +72,21 @@ public void testMetrics() throws Exception { PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); String metricsStr = output.toString(StandardCharsets.UTF_8); - Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + Multimap metricsMap = PrometheusMetricsClient.parseMetrics(metricsStr); - Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); - Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); + Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); + Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); Assert.assertTrue(latency.size() > 0); Assert.assertTrue(failed.size() > 0); - for (PrometheusMetricsTest.Metric m : latency) { + for (PrometheusMetricsClient.Metric m : latency) { Assert.assertNotNull(m.tags.get("cluster")); Assert.assertNotNull(m.tags.get("path")); Assert.assertNotNull(m.tags.get("method")); } - for (PrometheusMetricsTest.Metric m : failed) { + for (PrometheusMetricsClient.Metric m : failed) { Assert.assertNotNull(m.tags.get("cluster")); Assert.assertNotNull(m.tags.get("path")); Assert.assertNotNull(m.tags.get("method")); From 395c78084cdbd47fd84f9e4a4686d358c10d573a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 6 Mar 2024 01:30:26 +0800 Subject: [PATCH 03/13] OTEL --- .../apache/pulsar/broker/PulsarService.java | 2 +- .../broker/web/RestEndpointMetricsFilter.java | 81 ++++++++++++++----- .../apache/pulsar/broker/web/WebService.java | 2 +- .../stats/BrokerRestEndpointMetricsTest.java | 51 +++++------- 4 files changed, 84 insertions(+), 52 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3701f354b62b0..62b759e1a733b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -751,6 +751,7 @@ public void start() throws PulsarServerException { config.getDefaultRetentionTimeInMinutes() * 60)); } + this.openTelemetry = new PulsarBrokerOpenTelemetry(config); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; @@ -902,7 +903,6 @@ public void start() throws PulsarServerException { } this.metricsGenerator = new MetricsGenerator(this); - this.openTelemetry = new PulsarBrokerOpenTelemetry(config); // Initialize the message protocol handlers. // start the protocol handlers only after the broker is ready, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index c16cc96ba527c..7303b5583e66b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -21,24 +21,32 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import io.prometheus.client.Counter; -import io.prometheus.client.Histogram; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; import java.io.IOException; +import java.time.Duration; +import java.util.List; import java.util.Stack; +import javax.validation.constraints.NotNull; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.glassfish.jersey.server.internal.routing.UriRoutingContext; import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.ResourceMethod; -import org.jetbrains.annotations.NotNull; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { - private static final LoadingCache CACHE = CacheBuilder + private final LoadingCache CACHE = CacheBuilder .newBuilder() .maximumSize(100) + .expireAfterAccess(Duration.ofMinutes(1)) .build(new CacheLoader<>() { @Override public @NotNull String load(@NotNull ResourceMethod method) throws Exception { @@ -46,18 +54,35 @@ public class RestEndpointMetricsFilter implements ContainerResponseFilter, Conta } }); - private static final Histogram LATENCY = Histogram - .build("pulsar_broker_rest_endpoint_latency", "-") - .unit("ms") - .labelNames("path", "method") - .buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D) - .register(); - private static final Counter FAILED = Counter - .build("pulsar_broker_rest_endpoint_failed", "-") - .labelNames("path", "method", "code") - .register(); - private static final String REQUEST_START_TIME = "requestStartTime"; + private static final AttributeKey PATH = AttributeKey.stringKey("path"); + private static final AttributeKey METHOD = AttributeKey.stringKey("method"); + private static final AttributeKey CODE = AttributeKey.stringKey("code"); + + private final DoubleHistogram latency; + private final LongCounter failed; + + private RestEndpointMetricsFilter(PulsarService pulsar) { + PulsarBrokerOpenTelemetry telemetry = pulsar.getOpenTelemetry(); + Meter meter = telemetry.getMeter(); + latency = meter.histogramBuilder("pulsar_broker_rest_endpoint_latency") + .setDescription("-") + .setUnit("ms") + .setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)) + .build(); + failed = meter.counterBuilder("pulsar_broker_rest_endpoint_failed") + .setDescription("-") + .build(); + } + + private static volatile RestEndpointMetricsFilter INSTANCE; + + public static synchronized RestEndpointMetricsFilter create(PulsarService pulsar) { + if (INSTANCE == null) { + INSTANCE = new RestEndpointMetricsFilter(pulsar); + } + return INSTANCE; + } @Override public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { @@ -72,12 +97,15 @@ public void filter(ContainerRequestContext req, ContainerResponseContext resp) t String method = req.getMethod(); Response.StatusType status = resp.getStatusInfo(); - if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) { - long start = req.getProperty(REQUEST_START_TIME) == null - ? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME); - LATENCY.labels(path, method).observe(System.currentTimeMillis() - start); - } else { - FAILED.labels(path, method, String.valueOf(status.getStatusCode())).inc(); + // record failure + if (status.getStatusCode() >= Response.Status.BAD_REQUEST.getStatusCode()) { + recordFailure(path, method, status.getStatusCode()); + return; + } + // record success + Object o = req.getProperty(REQUEST_START_TIME); + if (o instanceof Long start) { + recordSuccess(path, method, System.currentTimeMillis() - start); } } @@ -87,6 +115,17 @@ public void filter(ContainerRequestContext req) throws IOException { req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); } + + private void recordSuccess(String path, String method, long duration) { + Attributes attributes = Attributes.of(PATH, path, METHOD, method); + latency.record(duration, attributes); + } + + private void recordFailure(String path, String method, int code) { + Attributes attributes = Attributes.of(PATH, path, METHOD, method, CODE, String.valueOf(code)); + failed.add(1, attributes); + } + private static String getRestPath(ResourceMethod method) { try { StringBuilder fullPath = new StringBuilder(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 05d8f82ddb5be..9844a10b8d8c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -193,7 +193,7 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, } config.register(MultiPartFeature.class); if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { - config.register(RestEndpointMetricsFilter.class); + config.register(RestEndpointMetricsFilter.create(pulsar)); } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 681ebc3f51318..53d618429a28a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -18,17 +18,11 @@ */ package org.apache.pulsar.broker.stats; -import com.google.common.collect.Multimap; -import java.io.ByteArrayOutputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collection; import java.util.Set; import java.util.UUID; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -68,29 +62,28 @@ public void testMetrics() throws Exception { admin.namespaces().deleteNamespace("test/test"); admin.tenants().deleteTenant("test"); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + // TODO: add more test cases + PrometheusMetricsClient client = new PrometheusMetricsClient("127.0.0.1", pulsar.getListenPortHTTP().get()); + PrometheusMetricsClient.Metrics metrics = client.getMetrics(); + System.out.println(); - String metricsStr = output.toString(StandardCharsets.UTF_8); - Multimap metricsMap = PrometheusMetricsClient.parseMetrics(metricsStr); - - Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); - Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); - - Assert.assertTrue(latency.size() > 0); - Assert.assertTrue(failed.size() > 0); - - for (PrometheusMetricsClient.Metric m : latency) { - Assert.assertNotNull(m.tags.get("cluster")); - Assert.assertNotNull(m.tags.get("path")); - Assert.assertNotNull(m.tags.get("method")); - } - - for (PrometheusMetricsClient.Metric m : failed) { - Assert.assertNotNull(m.tags.get("cluster")); - Assert.assertNotNull(m.tags.get("path")); - Assert.assertNotNull(m.tags.get("method")); - Assert.assertNotNull(m.tags.get("code")); - } +// Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); +// Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); +// +// Assert.assertTrue(latency.size() > 0); +// Assert.assertTrue(failed.size() > 0); +// +// for (PrometheusMetricsClient.Metric m : latency) { +// Assert.assertNotNull(m.tags.get("cluster")); +// Assert.assertNotNull(m.tags.get("path")); +// Assert.assertNotNull(m.tags.get("method")); +// } +// +// for (PrometheusMetricsClient.Metric m : failed) { +// Assert.assertNotNull(m.tags.get("cluster")); +// Assert.assertNotNull(m.tags.get("path")); +// Assert.assertNotNull(m.tags.get("method")); +// Assert.assertNotNull(m.tags.get("code")); +// } } } \ No newline at end of file From 552a4ac3ec833c5f57d2bd4bd8a6a25ceec9b7da Mon Sep 17 00:00:00 2001 From: dao-jun Date: Wed, 6 Mar 2024 01:36:15 +0800 Subject: [PATCH 04/13] fix var name --- .../pulsar/broker/web/RestEndpointMetricsFilter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 7303b5583e66b..788ef3376f3ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -43,7 +43,7 @@ import org.glassfish.jersey.server.model.ResourceMethod; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { - private final LoadingCache CACHE = CacheBuilder + private final LoadingCache cache = CacheBuilder .newBuilder() .maximumSize(100) .expireAfterAccess(Duration.ofMinutes(1)) @@ -75,13 +75,13 @@ private RestEndpointMetricsFilter(PulsarService pulsar) { .build(); } - private static volatile RestEndpointMetricsFilter INSTANCE; + private static volatile RestEndpointMetricsFilter instance; public static synchronized RestEndpointMetricsFilter create(PulsarService pulsar) { - if (INSTANCE == null) { - INSTANCE = new RestEndpointMetricsFilter(pulsar); + if (instance == null) { + instance = new RestEndpointMetricsFilter(pulsar); } - return INSTANCE; + return instance; } @Override @@ -90,7 +90,7 @@ public void filter(ContainerRequestContext req, ContainerResponseContext resp) t try { UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); ResourceMethod rm = info.getMatchedResourceMethod(); - path = CACHE.get(rm); + path = cache.get(rm); } catch (Throwable ex) { path = "UNKNOWN"; } From bc18d1189021d4c865d22a35ba6f29d81b2ad70a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 7 Mar 2024 06:56:29 +0800 Subject: [PATCH 05/13] fix var name --- .../apache/pulsar/broker/PulsarService.java | 2 - .../broker/web/RestEndpointMetricsFilter.java | 55 +++++++++---------- .../apache/pulsar/broker/web/WebService.java | 2 +- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b0cee2e51c52e..089c679170bfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -761,8 +761,6 @@ public void start() throws PulsarServerException { config.getBacklogQuotaDefaultLimitSecond(), config.getDefaultRetentionTimeInMinutes() * 60)); } - - this.openTelemetry = new PulsarBrokerOpenTelemetry(config); localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 788ef3376f3ab..376148bbe858f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -26,6 +26,11 @@ import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.semconv.SemanticAttributes; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; import java.io.IOException; import java.time.Duration; import java.util.List; @@ -36,76 +41,69 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.core.Response; -import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; -import org.glassfish.jersey.server.internal.routing.UriRoutingContext; -import org.glassfish.jersey.server.model.Resource; -import org.glassfish.jersey.server.model.ResourceMethod; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { - private final LoadingCache cache = CacheBuilder + private final LoadingCache cache = CacheBuilder .newBuilder() .maximumSize(100) .expireAfterAccess(Duration.ofMinutes(1)) .build(new CacheLoader<>() { @Override - public @NotNull String load(@NotNull ResourceMethod method) throws Exception { - return getRestPath(method); + public @NotNull Attributes load(@NotNull ResourceMethod method) throws Exception { + return Attributes.of(PATH, getRestPath(method), METHOD, method.getHttpMethod()); } }); private static final String REQUEST_START_TIME = "requestStartTime"; - private static final AttributeKey PATH = AttributeKey.stringKey("path"); - private static final AttributeKey METHOD = AttributeKey.stringKey("method"); - private static final AttributeKey CODE = AttributeKey.stringKey("code"); + private static final AttributeKey PATH = SemanticAttributes.URL_PATH; + private static final AttributeKey METHOD = SemanticAttributes.HTTP_REQUEST_METHOD; + private static final AttributeKey CODE = SemanticAttributes.HTTP_RESPONSE_STATUS_CODE; private final DoubleHistogram latency; private final LongCounter failed; - private RestEndpointMetricsFilter(PulsarService pulsar) { - PulsarBrokerOpenTelemetry telemetry = pulsar.getOpenTelemetry(); - Meter meter = telemetry.getMeter(); + private RestEndpointMetricsFilter(PulsarBrokerOpenTelemetry openTelemetry) { + Meter meter = openTelemetry.getMeter(); latency = meter.histogramBuilder("pulsar_broker_rest_endpoint_latency") - .setDescription("-") + .setDescription("Latency of REST endpoints in Pulsar broker") .setUnit("ms") .setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)) .build(); failed = meter.counterBuilder("pulsar_broker_rest_endpoint_failed") - .setDescription("-") + .setDescription("Number of failed REST endpoints in Pulsar broker") .build(); } private static volatile RestEndpointMetricsFilter instance; - public static synchronized RestEndpointMetricsFilter create(PulsarService pulsar) { + public static synchronized RestEndpointMetricsFilter create(PulsarBrokerOpenTelemetry openTelemetry) { if (instance == null) { - instance = new RestEndpointMetricsFilter(pulsar); + instance = new RestEndpointMetricsFilter(openTelemetry); } return instance; } @Override public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { - String path; + Attributes attrs; try { UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); ResourceMethod rm = info.getMatchedResourceMethod(); - path = cache.get(rm); + attrs = cache.get(rm); } catch (Throwable ex) { - path = "UNKNOWN"; + attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod()); } - String method = req.getMethod(); Response.StatusType status = resp.getStatusInfo(); // record failure if (status.getStatusCode() >= Response.Status.BAD_REQUEST.getStatusCode()) { - recordFailure(path, method, status.getStatusCode()); + recordFailure(attrs, status.getStatusCode()); return; } // record success Object o = req.getProperty(REQUEST_START_TIME); if (o instanceof Long start) { - recordSuccess(path, method, System.currentTimeMillis() - start); + recordSuccess(attrs, System.currentTimeMillis() - start); } } @@ -116,13 +114,12 @@ public void filter(ContainerRequestContext req) throws IOException { } - private void recordSuccess(String path, String method, long duration) { - Attributes attributes = Attributes.of(PATH, path, METHOD, method); - latency.record(duration, attributes); + private void recordSuccess(Attributes attrs, long duration) { + latency.record(duration, attrs); } - private void recordFailure(String path, String method, int code) { - Attributes attributes = Attributes.of(PATH, path, METHOD, method, CODE, String.valueOf(code)); + private void recordFailure(Attributes attrs, long code) { + Attributes attributes = attrs.toBuilder().put(CODE, code).build(); failed.add(1, attributes); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 9844a10b8d8c7..eeeee2bf4f1ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -193,7 +193,7 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, } config.register(MultiPartFeature.class); if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { - config.register(RestEndpointMetricsFilter.create(pulsar)); + config.register(RestEndpointMetricsFilter.create(pulsar.getOpenTelemetry())); } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); From f894da96a32c8b85eb0aed3020a8e18f05885ef2 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Thu, 7 Mar 2024 17:41:56 +0800 Subject: [PATCH 06/13] fix code style --- .../pulsar/broker/web/RestEndpointMetricsFilter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 376148bbe858f..db6a51c63dc77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -27,10 +27,6 @@ import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.semconv.SemanticAttributes; -import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; -import org.glassfish.jersey.server.internal.routing.UriRoutingContext; -import org.glassfish.jersey.server.model.Resource; -import org.glassfish.jersey.server.model.ResourceMethod; import java.io.IOException; import java.time.Duration; import java.util.List; @@ -41,6 +37,10 @@ import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { private final LoadingCache cache = CacheBuilder From aa513b17eeb4461b0c56dcd436ea793e32dfaec1 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 11 Mar 2024 19:49:13 +0800 Subject: [PATCH 07/13] Update code --- conf/broker.conf | 3 - conf/standalone.conf | 3 - .../pulsar/broker/ServiceConfiguration.java | 5 - .../broker/web/RestEndpointMetricsFilter.java | 96 ++++++------------- .../apache/pulsar/broker/web/WebService.java | 5 +- .../stats/BrokerRestEndpointMetricsTest.java | 1 - 6 files changed, 30 insertions(+), 83 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 94b1d76394d66..ea98ad4a9b5d2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1538,9 +1538,6 @@ authenticateMetricsEndpoint=false # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true -# Enable expose per rest endpoint metrics of the broker. -exposePerRestEndpointMetricsInPrometheus=false - # Enable consumer level metrics. default is false exposeConsumerLevelMetricsInPrometheus=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 0801a15dc3dc6..a916a2f477e8f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -968,9 +968,6 @@ webSocketMaxTextFrameSize=1048576 # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true -# Enable expose per rest endpoint metrics of the broker. -exposePerRestEndpointMetricsInPrometheus=false - # Time in milliseconds that metrics endpoint would time out. Default is 30s. # Increase it if there are a lot of topics to expose topic-level metrics. # Set it to 0 to disable timeout. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a5ac981a896f9..e088f50a05c88 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2983,11 +2983,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private boolean exposeBundlesMetricsInPrometheus = false; - @FieldContext( - category = CATEGORY_METRICS, - doc = "Enable expose per rest endpoint metrics of the broker.") - private boolean exposePerRestEndpointMetricsInPrometheus = false; - /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index db6a51c63dc77..f2895e3b625e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -18,49 +18,31 @@ */ package org.apache.pulsar.broker.web; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; -import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.semconv.SemanticAttributes; import java.io.IOException; -import java.time.Duration; import java.util.List; -import java.util.Stack; -import javax.validation.constraints.NotNull; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.core.Response; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.glassfish.jersey.server.internal.routing.UriRoutingContext; -import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.ResourceMethod; +import org.glassfish.jersey.uri.UriTemplate; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { - private final LoadingCache cache = CacheBuilder - .newBuilder() - .maximumSize(100) - .expireAfterAccess(Duration.ofMinutes(1)) - .build(new CacheLoader<>() { - @Override - public @NotNull Attributes load(@NotNull ResourceMethod method) throws Exception { - return Attributes.of(PATH, getRestPath(method), METHOD, method.getHttpMethod()); - } - }); - private static final String REQUEST_START_TIME = "requestStartTime"; private static final AttributeKey PATH = SemanticAttributes.URL_PATH; private static final AttributeKey METHOD = SemanticAttributes.HTTP_REQUEST_METHOD; private static final AttributeKey CODE = SemanticAttributes.HTTP_RESPONSE_STATUS_CODE; private final DoubleHistogram latency; - private final LongCounter failed; private RestEndpointMetricsFilter(PulsarBrokerOpenTelemetry openTelemetry) { Meter meter = openTelemetry.getMeter(); @@ -69,18 +51,15 @@ private RestEndpointMetricsFilter(PulsarBrokerOpenTelemetry openTelemetry) { .setUnit("ms") .setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)) .build(); - failed = meter.counterBuilder("pulsar_broker_rest_endpoint_failed") - .setDescription("Number of failed REST endpoints in Pulsar broker") - .build(); } private static volatile RestEndpointMetricsFilter instance; public static synchronized RestEndpointMetricsFilter create(PulsarBrokerOpenTelemetry openTelemetry) { - if (instance == null) { - instance = new RestEndpointMetricsFilter(openTelemetry); - } - return instance; + if (instance == null) { + instance = new RestEndpointMetricsFilter(openTelemetry); + } + return instance; } @Override @@ -88,23 +67,19 @@ public void filter(ContainerRequestContext req, ContainerResponseContext resp) t Attributes attrs; try { UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); - ResourceMethod rm = info.getMatchedResourceMethod(); - attrs = cache.get(rm); + attrs = getRequestAttributes(info); } catch (Throwable ex) { attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod()); } Response.StatusType status = resp.getStatusInfo(); - // record failure - if (status.getStatusCode() >= Response.Status.BAD_REQUEST.getStatusCode()) { - recordFailure(attrs, status.getStatusCode()); - return; - } - // record success + int statusCode = status.getStatusCode(); Object o = req.getProperty(REQUEST_START_TIME); - if (o instanceof Long start) { - recordSuccess(attrs, System.currentTimeMillis() - start); + if (!(o instanceof Long start)) { + return; } + long latency = System.currentTimeMillis() - start; + recordLatency(attrs, statusCode, latency); } @Override @@ -114,40 +89,25 @@ public void filter(ContainerRequestContext req) throws IOException { } - private void recordSuccess(Attributes attrs, long duration) { + private void recordLatency(Attributes attrs, int code, long duration) { + attrs = attrs.toBuilder().put(CODE, (long) code).build(); latency.record(duration, attrs); } - private void recordFailure(Attributes attrs, long code) { - Attributes attributes = attrs.toBuilder().put(CODE, code).build(); - failed.add(1, attributes); - } - - private static String getRestPath(ResourceMethod method) { - try { - StringBuilder fullPath = new StringBuilder(); - Stack pathStack = new Stack<>(); - Resource parent = method.getParent(); - - while (true) { - String path = parent.getPath(); - parent = parent.getParent(); - if (parent == null) { - if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) { - pathStack.push("/"); - } - pathStack.push(path); - break; - } - pathStack.push(path); - - } - while (!pathStack.isEmpty()) { - fullPath.append(pathStack.pop().replace("{", ":").replace("}", "")); - } - return fullPath.toString(); - } catch (Exception ex) { - return "UNKNOWN"; + private static Attributes getRequestAttributes(UriRoutingContext ctx) { + List templates = ctx.getMatchedTemplates(); + ResourceMethod method = ctx.getMatchedResourceMethod(); + String httpMethod = method == null ? "UNKNOWN" : method.getHttpMethod(); + if (CollectionUtils.isEmpty(templates)) { + return Attributes.of(PATH, "UNKNOWN", METHOD, httpMethod); + } + UriTemplate[] arr = templates.toArray(new UriTemplate[0]); + int idx = arr.length - 1; + StringBuilder builder = new StringBuilder(); + for (; idx >= 0; idx--) { + builder.append(arr[idx].getTemplate()); } + String template = builder.toString().replace("{", ":").replace("}", ""); + return Attributes.of(PATH, template, METHOD, httpMethod); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index eeeee2bf4f1ea..d438b7a1e23fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -192,9 +192,8 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); - if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { - config.register(RestEndpointMetricsFilter.create(pulsar.getOpenTelemetry())); - } + // Register the filter that will collect metrics for REST endpoints + config.register(RestEndpointMetricsFilter.create(pulsar.getOpenTelemetry())); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 53d618429a28a..42ec3c25b3fdc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -32,7 +32,6 @@ public class BrokerRestEndpointMetricsTest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { - conf.setExposePerRestEndpointMetricsInPrometheus(true); baseSetup(); } From 76dcde1ca05c0847fdbfc25b71e9705f9c822778 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 11 Mar 2024 20:37:37 +0800 Subject: [PATCH 08/13] Fix test --- .../stats/BrokerRestEndpointMetricsTest.java | 87 +++++++++++++------ 1 file changed, 62 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 42ec3c25b3fdc..4b4855a461214 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -17,12 +17,20 @@ * under the License. */ package org.apache.pulsar.broker.stats; - +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Collection; +import java.util.Optional; import java.util.Set; import java.util.UUID; import org.apache.pulsar.broker.service.BrokerTestBase; -import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -41,6 +49,12 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + @Test public void testMetrics() throws Exception { @@ -61,28 +75,51 @@ public void testMetrics() throws Exception { admin.namespaces().deleteNamespace("test/test"); admin.tenants().deleteTenant("test"); - // TODO: add more test cases - PrometheusMetricsClient client = new PrometheusMetricsClient("127.0.0.1", pulsar.getListenPortHTTP().get()); - PrometheusMetricsClient.Metrics metrics = client.getMetrics(); - System.out.println(); - -// Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); -// Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); -// -// Assert.assertTrue(latency.size() > 0); -// Assert.assertTrue(failed.size() > 0); -// -// for (PrometheusMetricsClient.Metric m : latency) { -// Assert.assertNotNull(m.tags.get("cluster")); -// Assert.assertNotNull(m.tags.get("path")); -// Assert.assertNotNull(m.tags.get("method")); -// } -// -// for (PrometheusMetricsClient.Metric m : failed) { -// Assert.assertNotNull(m.tags.get("cluster")); -// Assert.assertNotNull(m.tags.get("path")); -// Assert.assertNotNull(m.tags.get("method")); -// Assert.assertNotNull(m.tags.get("code")); -// } + Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + Optional optional = metricDatas.stream().filter(m -> m.getName() + .equals("pulsar_broker_rest_endpoint_latency")).findFirst(); + Assert.assertTrue(optional.isPresent()); + + MetricData metricData = optional.get(); + Assert.assertFalse(metricData.getDescription().isEmpty()); + Assert.assertEquals(metricData.getUnit(), "ms"); + Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); + + @SuppressWarnings("unchecked") + Data data = (Data) metricData.getData(); + data.getPoints().forEach(point -> { + hasAttributes(point); + Assert.assertTrue(point.getCount() > 0); + Assert.assertTrue(point.getSum() > 0); + }); + + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); + Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); + } + + private static boolean hasPoint(Data data, String uri, String method) { + Collection points = data.getPoints(); + for (HistogramPointData point : points) { + Attributes attrs = point.getAttributes(); + + if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) + && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { + return true; + } + } + + return false; + } + + private static void hasAttributes(HistogramPointData data) { + Attributes attrs = data.getAttributes(); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); + Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); } } \ No newline at end of file From f414210bbbaf09c3ce6bcff28b45f10857d44fe3 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Mon, 11 Mar 2024 20:42:21 +0800 Subject: [PATCH 09/13] Fix code --- .../broker/web/RestEndpointMetricsFilter.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index f2895e3b625e3..d40db8534a80f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -64,22 +64,21 @@ public static synchronized RestEndpointMetricsFilter create(PulsarBrokerOpenTele @Override public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + Response.StatusType status = resp.getStatusInfo(); + int statusCode = status.getStatusCode(); Attributes attrs; try { UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); - attrs = getRequestAttributes(info); + attrs = getRequestAttributes(info, statusCode); } catch (Throwable ex) { - attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod()); + attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod(), CODE, (long) statusCode); } - Response.StatusType status = resp.getStatusInfo(); - int statusCode = status.getStatusCode(); Object o = req.getProperty(REQUEST_START_TIME); - if (!(o instanceof Long start)) { - return; + if (o instanceof Long start) { + long duration = System.currentTimeMillis() - start; + this.latency.record(duration, attrs); } - long latency = System.currentTimeMillis() - start; - recordLatency(attrs, statusCode, latency); } @Override @@ -88,18 +87,12 @@ public void filter(ContainerRequestContext req) throws IOException { req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); } - - private void recordLatency(Attributes attrs, int code, long duration) { - attrs = attrs.toBuilder().put(CODE, (long) code).build(); - latency.record(duration, attrs); - } - - private static Attributes getRequestAttributes(UriRoutingContext ctx) { + private static Attributes getRequestAttributes(UriRoutingContext ctx, long statusCode) { List templates = ctx.getMatchedTemplates(); ResourceMethod method = ctx.getMatchedResourceMethod(); String httpMethod = method == null ? "UNKNOWN" : method.getHttpMethod(); if (CollectionUtils.isEmpty(templates)) { - return Attributes.of(PATH, "UNKNOWN", METHOD, httpMethod); + return Attributes.of(PATH, "UNKNOWN", METHOD, httpMethod, CODE, statusCode); } UriTemplate[] arr = templates.toArray(new UriTemplate[0]); int idx = arr.length - 1; @@ -108,6 +101,6 @@ private static Attributes getRequestAttributes(UriRoutingContext ctx) { builder.append(arr[idx].getTemplate()); } String template = builder.toString().replace("{", ":").replace("}", ""); - return Attributes.of(PATH, template, METHOD, httpMethod); + return Attributes.of(PATH, template, METHOD, httpMethod, CODE, statusCode); } } From f8c3f8016c1a4e137272cdce73370c60238e52dc Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 19:55:25 +0800 Subject: [PATCH 10/13] fix test --- .../broker/stats/BrokerRestEndpointMetricsTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 4b4855a461214..f3222c5b03afd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -27,20 +27,21 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker") -public class BrokerRestEndpointMetricsTest extends BrokerTestBase { +public class BrokerRestEndpointMetricsTest extends MockedPulsarServiceBaseTest { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { - baseSetup(); + super.internalSetup(); } @BeforeMethod(alwaysRun = true) @@ -58,6 +59,7 @@ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder b @Test public void testMetrics() throws Exception { + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); admin.namespaces().createNamespace("test/test"); String topic = "persistent://test/test/test_" + UUID.randomUUID(); From 5b9bf65d55531362854cf2237c32236ede4fdc69 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 21:01:15 +0800 Subject: [PATCH 11/13] fix test --- .../stats/BrokerRestEndpointMetricsTest.java | 176 +++++++++--------- 1 file changed, 88 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index f3222c5b03afd..7d649736debf9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -17,21 +17,21 @@ * under the License. */ package org.apache.pulsar.broker.stats; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.sdk.metrics.data.Data; -import io.opentelemetry.sdk.metrics.data.HistogramPointData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.data.MetricDataType; -import io.opentelemetry.semconv.SemanticAttributes; -import java.util.Collection; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +//import io.opentelemetry.api.common.Attributes; +//import io.opentelemetry.sdk.metrics.data.Data; +//import io.opentelemetry.sdk.metrics.data.HistogramPointData; +//import io.opentelemetry.sdk.metrics.data.MetricData; +//import io.opentelemetry.sdk.metrics.data.MetricDataType; +//import io.opentelemetry.semconv.SemanticAttributes; +//import java.util.Collection; +//import java.util.Optional; +//import java.util.Set; +//import java.util.UUID; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.testcontext.PulsarTestContext; -import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.TenantInfo; -import org.testng.Assert; +//import org.apache.pulsar.broker.testcontext.PulsarTestContext; +//import org.apache.pulsar.common.policies.data.ClusterData; +//import org.apache.pulsar.common.policies.data.TenantInfo; +//import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -50,78 +50,78 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Override - protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { - super.customizeMainPulsarTestContextBuilder(builder); - builder.enableOpenTelemetry(true); - } - - - @Test - public void testMetrics() throws Exception { - admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); - admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); - admin.namespaces().createNamespace("test/test"); - String topic = "persistent://test/test/test_" + UUID.randomUUID(); - admin.topics().createNonPartitionedTopic(topic); - admin.topics().getList("test/test"); - - // This request will be failed - try { - admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); - } catch (Exception e) { - // ignore - } - - admin.topics().delete(topic, true); - admin.namespaces().deleteNamespace("test/test"); - admin.tenants().deleteTenant("test"); - - Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); - Optional optional = metricDatas.stream().filter(m -> m.getName() - .equals("pulsar_broker_rest_endpoint_latency")).findFirst(); - Assert.assertTrue(optional.isPresent()); - - MetricData metricData = optional.get(); - Assert.assertFalse(metricData.getDescription().isEmpty()); - Assert.assertEquals(metricData.getUnit(), "ms"); - Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); - - @SuppressWarnings("unchecked") - Data data = (Data) metricData.getData(); - data.getPoints().forEach(point -> { - hasAttributes(point); - Assert.assertTrue(point.getCount() > 0); - Assert.assertTrue(point.getSum() > 0); - }); - - Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); - Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); - Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); - Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); - Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); - Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); - Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); - } - - private static boolean hasPoint(Data data, String uri, String method) { - Collection points = data.getPoints(); - for (HistogramPointData point : points) { - Attributes attrs = point.getAttributes(); - - if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) - && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { - return true; - } - } - - return false; - } - - private static void hasAttributes(HistogramPointData data) { - Attributes attrs = data.getAttributes(); - Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); - Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); - Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); - } +// @Override +// protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { +// super.customizeMainPulsarTestContextBuilder(builder); +// builder.enableOpenTelemetry(true); +// } +// +// +// @Test +// public void testMetrics() throws Exception { +// admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); +// admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); +// admin.namespaces().createNamespace("test/test"); +// String topic = "persistent://test/test/test_" + UUID.randomUUID(); +// admin.topics().createNonPartitionedTopic(topic); +// admin.topics().getList("test/test"); +// +// // This request will be failed +// try { +// admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); +// } catch (Exception e) { +// // ignore +// } +// +// admin.topics().delete(topic, true); +// admin.namespaces().deleteNamespace("test/test"); +// admin.tenants().deleteTenant("test"); +// +// Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); +// Optional optional = metricDatas.stream().filter(m -> m.getName() +// .equals("pulsar_broker_rest_endpoint_latency")).findFirst(); +// Assert.assertTrue(optional.isPresent()); +// +// MetricData metricData = optional.get(); +// Assert.assertFalse(metricData.getDescription().isEmpty()); +// Assert.assertEquals(metricData.getUnit(), "ms"); +// Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); +// +// @SuppressWarnings("unchecked") +// Data data = (Data) metricData.getData(); +// data.getPoints().forEach(point -> { +// hasAttributes(point); +// Assert.assertTrue(point.getCount() > 0); +// Assert.assertTrue(point.getSum() > 0); +// }); +// +// Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); +// Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); +// Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); +// Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); +// Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); +// Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); +// Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); +// } +// +// private static boolean hasPoint(Data data, String uri, String method) { +// Collection points = data.getPoints(); +// for (HistogramPointData point : points) { +// Attributes attrs = point.getAttributes(); +// +// if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) +// && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { +// return true; +// } +// } +// +// return false; +// } +// +// private static void hasAttributes(HistogramPointData data) { +// Attributes attrs = data.getAttributes(); +// Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); +// Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); +// Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); +// } } \ No newline at end of file From 434ff8d636d4acd7b0bc681c59e967e6f728e206 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 22:42:06 +0800 Subject: [PATCH 12/13] fix test --- .../stats/BrokerRestEndpointMetricsTest.java | 179 +++++++++--------- 1 file changed, 91 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 7d649736debf9..3178332a83db9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -17,26 +17,29 @@ * under the License. */ package org.apache.pulsar.broker.stats; -//import io.opentelemetry.api.common.Attributes; -//import io.opentelemetry.sdk.metrics.data.Data; -//import io.opentelemetry.sdk.metrics.data.HistogramPointData; -//import io.opentelemetry.sdk.metrics.data.MetricData; -//import io.opentelemetry.sdk.metrics.data.MetricDataType; -//import io.opentelemetry.semconv.SemanticAttributes; -//import java.util.Collection; -//import java.util.Optional; -//import java.util.Set; -//import java.util.UUID; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Collection; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -//import org.apache.pulsar.broker.testcontext.PulsarTestContext; -//import org.apache.pulsar.common.policies.data.ClusterData; -//import org.apache.pulsar.common.policies.data.TenantInfo; -//import org.testng.Assert; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test(groups = "broker") public class BrokerRestEndpointMetricsTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(BrokerRestEndpointMetricsTest.class); @BeforeMethod(alwaysRun = true) @Override @@ -50,78 +53,78 @@ protected void cleanup() throws Exception { super.internalCleanup(); } -// @Override -// protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { -// super.customizeMainPulsarTestContextBuilder(builder); -// builder.enableOpenTelemetry(true); -// } -// -// -// @Test -// public void testMetrics() throws Exception { -// admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); -// admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); -// admin.namespaces().createNamespace("test/test"); -// String topic = "persistent://test/test/test_" + UUID.randomUUID(); -// admin.topics().createNonPartitionedTopic(topic); -// admin.topics().getList("test/test"); -// -// // This request will be failed -// try { -// admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); -// } catch (Exception e) { -// // ignore -// } -// -// admin.topics().delete(topic, true); -// admin.namespaces().deleteNamespace("test/test"); -// admin.tenants().deleteTenant("test"); -// -// Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); -// Optional optional = metricDatas.stream().filter(m -> m.getName() -// .equals("pulsar_broker_rest_endpoint_latency")).findFirst(); -// Assert.assertTrue(optional.isPresent()); -// -// MetricData metricData = optional.get(); -// Assert.assertFalse(metricData.getDescription().isEmpty()); -// Assert.assertEquals(metricData.getUnit(), "ms"); -// Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); -// -// @SuppressWarnings("unchecked") -// Data data = (Data) metricData.getData(); -// data.getPoints().forEach(point -> { -// hasAttributes(point); -// Assert.assertTrue(point.getCount() > 0); -// Assert.assertTrue(point.getSum() > 0); -// }); -// -// Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); -// Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); -// Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); -// Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); -// Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); -// Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); -// Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); -// } -// -// private static boolean hasPoint(Data data, String uri, String method) { -// Collection points = data.getPoints(); -// for (HistogramPointData point : points) { -// Attributes attrs = point.getAttributes(); -// -// if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) -// && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { -// return true; -// } -// } -// -// return false; -// } -// -// private static void hasAttributes(HistogramPointData data) { -// Attributes attrs = data.getAttributes(); -// Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); -// Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); -// Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); -// } + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + + @Test + public void testMetrics() throws Exception { + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + Optional optional = metricDatas.stream().peek(m -> log.info("metric name: {}", m.getName())) + .filter(m -> m.getName().equals("pulsar_broker_rest_endpoint_latency")).findFirst(); + Assert.assertTrue(optional.isPresent()); + + MetricData metricData = optional.get(); + Assert.assertFalse(metricData.getDescription().isEmpty()); + Assert.assertEquals(metricData.getUnit(), "ms"); + Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); + + @SuppressWarnings("unchecked") + Data data = (Data) metricData.getData(); + data.getPoints().forEach(point -> { + hasAttributes(point); + Assert.assertTrue(point.getCount() > 0); + Assert.assertTrue(point.getSum() > 0); + }); + + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); + Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); + } + + private static boolean hasPoint(Data data, String uri, String method) { + Collection points = data.getPoints(); + for (HistogramPointData point : points) { + Attributes attrs = point.getAttributes(); + + if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) + && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { + return true; + } + } + + return false; + } + + private static void hasAttributes(HistogramPointData data) { + Attributes attrs = data.getAttributes(); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); + Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); + } } \ No newline at end of file From 94e8653c233d2c9b0b43a3a019e767918295401a Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 23:48:48 +0800 Subject: [PATCH 13/13] fix test --- .../pulsar/broker/stats/BrokerRestEndpointMetricsTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 3178332a83db9..a7983c6b01fc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -81,6 +81,7 @@ public void testMetrics() throws Exception { admin.tenants().deleteTenant("test"); Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + log.info("Metrics size: {}", metricDatas.size()); Optional optional = metricDatas.stream().peek(m -> log.info("metric name: {}", m.getName())) .filter(m -> m.getName().equals("pulsar_broker_rest_endpoint_latency")).findFirst(); Assert.assertTrue(optional.isPresent());