From a9a4c144520b5c6269d7b8332d16a72562e220ad Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 22 Nov 2022 01:50:34 +0800 Subject: [PATCH 1/5] Add restApi Metrics --- .../broker/web/RestEndpointMetricsFilter.java | 100 ++++++++++++++++++ .../apache/pulsar/broker/web/WebService.java | 1 + .../stats/BrokerRestEndpointMetricsTest.java | 94 ++++++++++++++++ 3 files changed, 195 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java new file mode 100644 index 0000000000000..56a7656da6df4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.concurrent.ExecutionException; +import javax.ws.rs.Path; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Invocable; +import org.glassfish.jersey.server.model.MethodHandler; + +public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { + private static final Cache CACHE = CacheBuilder + .newBuilder() + .maximumSize(100) + .build(); + + private static final Histogram LATENCY = Histogram + .build("pulsar_broker_rest_endpoint_latency", "-") + .unit("ms") + .labelNames("path", "method") + .buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D) + .register(); + private static final Counter FAILED = Counter + .build("pulsar_broker_rest_endpoint_failed", "-") + .labelNames("path", "method", "code") + .register(); + + private static final String REQUEST_START_TIME = "requestStartTime"; + + @Override + public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + String path; + try { + UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); + Invocable inv = info.getMatchedResourceMethod().getInvocable(); + MethodHandler handler = inv.getHandler(); + Method handlingMethod = inv.getHandlingMethod(); + path = getRequestPath(handler, handlingMethod); + } catch (Throwable ex) { + path = "UNKNOWN"; + } + + String method = req.getMethod(); + Response.Status status = resp.getStatusInfo().toEnum(); + if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) { + long start = req.getProperty(REQUEST_START_TIME) == null + ? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME); + LATENCY.labels(path, method).observe(System.currentTimeMillis() - start); + } else { + FAILED.labels(path, method, status.name()).inc(); + } + } + + @Override + public void filter(ContainerRequestContext req) throws IOException { + // Set the request start time into properties. + req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); + } + + + private static String getRequestPath(MethodHandler handler, Method method) throws ExecutionException { + Class klass = handler.getHandlerClass(); + + return CACHE.get(method, () -> { + Path parent = klass.getDeclaredAnnotation(Path.class); + Path child = method.getDeclaredAnnotation(Path.class); + String parent0 = parent == null ? "" : parent.value(); + String child0 = child == null ? "" : child.value().replace("{", ":").replace("}", ""); + + return parent0.endsWith("/") || child0.startsWith("/") ? parent0 + child0 : parent0 + "/" + child0; + }); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 1bbcf0db78f69..ff0fda29e82e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -188,6 +188,7 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); + config.register(RestEndpointMetricsFilter.class); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java new file mode 100644 index 0000000000000..57ce386cd12af --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRestEndpointMetricsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + baseSetup(); + } + + @BeforeMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testMetrics() throws Exception { + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + + String metricsStr = output.toString(StandardCharsets.UTF_8); + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); + Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); + + Assert.assertTrue(latency.size() > 0); + Assert.assertTrue(failed.size() > 0); + + for (PrometheusMetricsTest.Metric m : latency) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + } + + for (PrometheusMetricsTest.Metric m : failed) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + Assert.assertNotNull(m.tags.get("code")); + } + } +} From ffe9dccce7ab5f44aad2686c69b864dcc0606e86 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 22 Nov 2022 02:20:26 +0800 Subject: [PATCH 2/5] Add restApi Metrics --- .../broker/web/RestEndpointMetricsFilter.java | 50 ++++++++++--------- 1 file changed, 27 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 56a7656da6df4..72ff78fdc1fbe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -18,13 +18,13 @@ */ package org.apache.pulsar.broker.web; -import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import java.io.IOException; import java.lang.reflect.Method; -import java.util.concurrent.ExecutionException; import javax.ws.rs.Path; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; @@ -33,13 +33,33 @@ import javax.ws.rs.core.Response; import org.glassfish.jersey.server.internal.routing.UriRoutingContext; import org.glassfish.jersey.server.model.Invocable; -import org.glassfish.jersey.server.model.MethodHandler; +import org.glassfish.jersey.server.model.ResourceMethod; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { - private static final Cache CACHE = CacheBuilder + private static final LoadingCache CACHE = CacheBuilder .newBuilder() .maximumSize(100) - .build(); + .build(new CacheLoader<>() { + @Override + public String load(ResourceMethod method) throws Exception { + try { + Invocable inv = method.getInvocable(); + Class handlingClass = inv.getHandler().getHandlerClass(); + Method handlingMethod = inv.getHandlingMethod(); + + Path parent = handlingClass.getDeclaredAnnotation(Path.class); + Path child = handlingMethod.getDeclaredAnnotation(Path.class); + String parent0 = parent == null ? "" : parent.value(); + String child0 = child == null ? "" : child.value() + .replace("{", ":").replace("}", ""); + + return parent0.endsWith("/") || child0.startsWith("/") + ? parent0 + child0 : parent0 + "/" + child0; + } catch (Exception ex) { + return "UNKNOWN"; + } + } + }); private static final Histogram LATENCY = Histogram .build("pulsar_broker_rest_endpoint_latency", "-") @@ -59,10 +79,8 @@ public void filter(ContainerRequestContext req, ContainerResponseContext resp) t String path; try { UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); - Invocable inv = info.getMatchedResourceMethod().getInvocable(); - MethodHandler handler = inv.getHandler(); - Method handlingMethod = inv.getHandlingMethod(); - path = getRequestPath(handler, handlingMethod); + ResourceMethod rm = info.getMatchedResourceMethod(); + path = CACHE.get(rm); } catch (Throwable ex) { path = "UNKNOWN"; } @@ -83,18 +101,4 @@ public void filter(ContainerRequestContext req) throws IOException { // Set the request start time into properties. req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); } - - - private static String getRequestPath(MethodHandler handler, Method method) throws ExecutionException { - Class klass = handler.getHandlerClass(); - - return CACHE.get(method, () -> { - Path parent = klass.getDeclaredAnnotation(Path.class); - Path child = method.getDeclaredAnnotation(Path.class); - String parent0 = parent == null ? "" : parent.value(); - String child0 = child == null ? "" : child.value().replace("{", ":").replace("}", ""); - - return parent0.endsWith("/") || child0.startsWith("/") ? parent0 + child0 : parent0 + "/" + child0; - }); - } } From 08fe36fac6145c967dbef78df681013ccca2c63e Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 9 Dec 2022 17:56:54 +0800 Subject: [PATCH 3/5] Add config --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../main/java/org/apache/pulsar/broker/web/WebService.java | 4 +++- .../pulsar/broker/stats/BrokerRestEndpointMetricsTest.java | 1 + 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/conf/broker.conf b/conf/broker.conf index ace47fae2d1b3..6c3e81902dfbc 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1424,6 +1424,9 @@ authenticateMetricsEndpoint=false # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable consumer level metrics. default is false exposeConsumerLevelMetricsInPrometheus=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 9f22bb8dbe460..2a6dbd329289b 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -982,6 +982,9 @@ exposePublisherStats=true # Default is false. exposePreciseBacklogInPrometheus=false +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable splitting topic and partition label in Prometheus. # If enabled, a topic name will split into 2 parts, one is topic name without partition index, # another one is partition index, e.g. (topic=xxx, partition=0). diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0060dd4270008..1c6e63133f4b2 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2683,6 +2683,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private boolean exposeBundlesMetricsInPrometheus = false; + @FieldContext( + dynamic = true, + category = CATEGORY_METRICS, + doc = "Enable expose per rest endpoint metrics of the broker.") + private boolean exposePerRestEndpointMetricsInPrometheus = false; + /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index ff0fda29e82e8..d7186a3ee78f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -188,7 +188,9 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); - config.register(RestEndpointMetricsFilter.class); + if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { + config.register(RestEndpointMetricsFilter.class); + } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java index 57ce386cd12af..9a1a17cca6665 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -37,6 +37,7 @@ public class BrokerRestEndpointMetricsTest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + conf.setExposePerRestEndpointMetricsInPrometheus(true); baseSetup(); } From 52d387cd9ab0f29a3998530bb5ec7800136191eb Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 9 Dec 2022 18:44:33 +0800 Subject: [PATCH 4/5] Reimplement `get rest path` --- .../broker/web/RestEndpointMetricsFilter.java | 50 ++++++++++++------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 72ff78fdc1fbe..640d0de81e5a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -24,15 +24,14 @@ import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import java.io.IOException; -import java.lang.reflect.Method; -import javax.ws.rs.Path; +import java.util.Stack; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; import javax.ws.rs.core.Response; import org.glassfish.jersey.server.internal.routing.UriRoutingContext; -import org.glassfish.jersey.server.model.Invocable; +import org.glassfish.jersey.server.model.Resource; import org.glassfish.jersey.server.model.ResourceMethod; public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { @@ -42,22 +41,7 @@ public class RestEndpointMetricsFilter implements ContainerResponseFilter, Conta .build(new CacheLoader<>() { @Override public String load(ResourceMethod method) throws Exception { - try { - Invocable inv = method.getInvocable(); - Class handlingClass = inv.getHandler().getHandlerClass(); - Method handlingMethod = inv.getHandlingMethod(); - - Path parent = handlingClass.getDeclaredAnnotation(Path.class); - Path child = handlingMethod.getDeclaredAnnotation(Path.class); - String parent0 = parent == null ? "" : parent.value(); - String child0 = child == null ? "" : child.value() - .replace("{", ":").replace("}", ""); - - return parent0.endsWith("/") || child0.startsWith("/") - ? parent0 + child0 : parent0 + "/" + child0; - } catch (Exception ex) { - return "UNKNOWN"; - } + return getRestPath(method); } }); @@ -101,4 +85,32 @@ public void filter(ContainerRequestContext req) throws IOException { // Set the request start time into properties. req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); } + + private static String getRestPath(ResourceMethod method) { + try { + StringBuilder fullPath = new StringBuilder(); + Stack pathStack = new Stack<>(); + Resource parent = method.getParent(); + + while (true) { + String path = parent.getPath(); + parent = parent.getParent(); + if (parent == null) { + if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) { + pathStack.push("/"); + } + pathStack.push(path); + break; + } + pathStack.push(path); + + } + while (!pathStack.isEmpty()) { + fullPath.append(pathStack.pop().replace("{", ":").replace("}", "")); + } + return fullPath.toString(); + } catch (Exception ex) { + return "UNKNOWN"; + } + } } From ad6aa7b7510ec1d7ad1c1137b4197ae416d696bd Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 9 Dec 2022 18:48:11 +0800 Subject: [PATCH 5/5] Reimplement `get rest path` --- .../apache/pulsar/broker/web/RestEndpointMetricsFilter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java index 640d0de81e5a3..604e63492de9a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -70,13 +70,13 @@ public void filter(ContainerRequestContext req, ContainerResponseContext resp) t } String method = req.getMethod(); - Response.Status status = resp.getStatusInfo().toEnum(); + Response.StatusType status = resp.getStatusInfo(); if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) { long start = req.getProperty(REQUEST_START_TIME) == null ? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME); LATENCY.labels(path, method).observe(System.currentTimeMillis() - start); } else { - FAILED.labels(path, method, status.name()).inc(); + FAILED.labels(path, method, String.valueOf(status.getStatusCode())).inc(); } }