diff --git a/conf/broker.conf b/conf/broker.conf index f64d08a1de88c..60ef93fcf14f0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1432,6 +1432,9 @@ authenticateMetricsEndpoint=false # Enable topic level metrics exposeTopicLevelMetricsInPrometheus=true +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable consumer level metrics. default is false exposeConsumerLevelMetricsInPrometheus=false diff --git a/conf/standalone.conf b/conf/standalone.conf index ed883406883ed..695256a57204a 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -985,6 +985,9 @@ exposePublisherStats=true # Default is false. exposePreciseBacklogInPrometheus=false +# Enable expose per rest endpoint metrics of the broker. +exposePerRestEndpointMetricsInPrometheus=false + # Enable splitting topic and partition label in Prometheus. # If enabled, a topic name will split into 2 parts, one is topic name without partition index, # another one is partition index, e.g. (topic=xxx, partition=0). diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 106410d855e22..678d87c96db7e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2783,6 +2783,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private boolean exposeBundlesMetricsInPrometheus = false; + @FieldContext( + dynamic = true, + category = CATEGORY_METRICS, + doc = "Enable expose per rest endpoint metrics of the broker.") + private boolean exposePerRestEndpointMetricsInPrometheus = false; + /**** --- Functions. --- ****/ @FieldContext( category = CATEGORY_FUNCTIONS, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java new file mode 100644 index 0000000000000..604e63492de9a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import io.prometheus.client.Counter; +import io.prometheus.client.Histogram; +import java.io.IOException; +import java.util.Stack; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Response; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.Resource; +import org.glassfish.jersey.server.model.ResourceMethod; + +public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { + private static final LoadingCache CACHE = CacheBuilder + .newBuilder() + .maximumSize(100) + .build(new CacheLoader<>() { + @Override + public String load(ResourceMethod method) throws Exception { + return getRestPath(method); + } + }); + + private static final Histogram LATENCY = Histogram + .build("pulsar_broker_rest_endpoint_latency", "-") + .unit("ms") + .labelNames("path", "method") + .buckets(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D) + .register(); + private static final Counter FAILED = Counter + .build("pulsar_broker_rest_endpoint_failed", "-") + .labelNames("path", "method", "code") + .register(); + + private static final String REQUEST_START_TIME = "requestStartTime"; + + @Override + public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + String path; + try { + UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); + ResourceMethod rm = info.getMatchedResourceMethod(); + path = CACHE.get(rm); + } catch (Throwable ex) { + path = "UNKNOWN"; + } + + String method = req.getMethod(); + Response.StatusType status = resp.getStatusInfo(); + if (status.getStatusCode() < Response.Status.BAD_REQUEST.getStatusCode()) { + long start = req.getProperty(REQUEST_START_TIME) == null + ? System.currentTimeMillis() : (long) req.getProperty(REQUEST_START_TIME); + LATENCY.labels(path, method).observe(System.currentTimeMillis() - start); + } else { + FAILED.labels(path, method, String.valueOf(status.getStatusCode())).inc(); + } + } + + @Override + public void filter(ContainerRequestContext req) throws IOException { + // Set the request start time into properties. + req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); + } + + private static String getRestPath(ResourceMethod method) { + try { + StringBuilder fullPath = new StringBuilder(); + Stack pathStack = new Stack<>(); + Resource parent = method.getParent(); + + while (true) { + String path = parent.getPath(); + parent = parent.getParent(); + if (parent == null) { + if (!path.endsWith("/") && !pathStack.peek().startsWith("/")) { + pathStack.push("/"); + } + pathStack.push(path); + break; + } + pathStack.push(path); + + } + while (!pathStack.isEmpty()) { + fullPath.append(pathStack.pop().replace("{", ":").replace("}", "")); + } + return fullPath.toString(); + } catch (Exception ex) { + return "UNKNOWN"; + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 96e7c9d556b71..3ad2069f77cf7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -188,6 +188,9 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); + if (pulsar.getConfiguration().isExposePerRestEndpointMetricsInPrometheus()) { + config.register(RestEndpointMetricsFilter.class); + } ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java new file mode 100644 index 0000000000000..9a1a17cca6665 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRestEndpointMetricsTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + conf.setExposePerRestEndpointMetricsInPrometheus(true); + baseSetup(); + } + + @BeforeMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testMetrics() throws Exception { + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + + String metricsStr = output.toString(StandardCharsets.UTF_8); + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr); + + Collection latency = metricsMap.get("pulsar_broker_rest_endpoint_latency_ms_sum"); + Collection failed = metricsMap.get("pulsar_broker_rest_endpoint_failed_total"); + + Assert.assertTrue(latency.size() > 0); + Assert.assertTrue(failed.size() > 0); + + for (PrometheusMetricsTest.Metric m : latency) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + } + + for (PrometheusMetricsTest.Metric m : failed) { + Assert.assertNotNull(m.tags.get("cluster")); + Assert.assertNotNull(m.tags.get("path")); + Assert.assertNotNull(m.tags.get("method")); + Assert.assertNotNull(m.tags.get("code")); + } + } +}