diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c1137bcfc25b7..089c679170bfb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -761,7 +761,6 @@ public void start() throws PulsarServerException { config.getBacklogQuotaDefaultLimitSecond(), config.getDefaultRetentionTimeInMinutes() * 60)); } - localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java new file mode 100644 index 0000000000000..d40db8534a80f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestEndpointMetricsFilter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.semconv.SemanticAttributes; +import java.io.IOException; +import java.util.List; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Response; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; +import org.glassfish.jersey.server.internal.routing.UriRoutingContext; +import org.glassfish.jersey.server.model.ResourceMethod; +import org.glassfish.jersey.uri.UriTemplate; + +public class RestEndpointMetricsFilter implements ContainerResponseFilter, ContainerRequestFilter { + private static final String REQUEST_START_TIME = "requestStartTime"; + private static final AttributeKey PATH = SemanticAttributes.URL_PATH; + private static final AttributeKey METHOD = SemanticAttributes.HTTP_REQUEST_METHOD; + private static final AttributeKey CODE = SemanticAttributes.HTTP_RESPONSE_STATUS_CODE; + + private final DoubleHistogram latency; + + private RestEndpointMetricsFilter(PulsarBrokerOpenTelemetry openTelemetry) { + Meter meter = openTelemetry.getMeter(); + latency = meter.histogramBuilder("pulsar_broker_rest_endpoint_latency") + .setDescription("Latency of REST endpoints in Pulsar broker") + .setUnit("ms") + .setExplicitBucketBoundariesAdvice(List.of(10D, 20D, 50D, 100D, 200D, 500D, 1000D, 2000D)) + .build(); + } + + private static volatile RestEndpointMetricsFilter instance; + + public static synchronized RestEndpointMetricsFilter create(PulsarBrokerOpenTelemetry openTelemetry) { + if (instance == null) { + instance = new RestEndpointMetricsFilter(openTelemetry); + } + return instance; + } + + @Override + public void filter(ContainerRequestContext req, ContainerResponseContext resp) throws IOException { + Response.StatusType status = resp.getStatusInfo(); + int statusCode = status.getStatusCode(); + Attributes attrs; + try { + UriRoutingContext info = (UriRoutingContext) req.getUriInfo(); + attrs = getRequestAttributes(info, statusCode); + } catch (Throwable ex) { + attrs = Attributes.of(PATH, "UNKNOWN", METHOD, req.getMethod(), CODE, (long) statusCode); + } + + Object o = req.getProperty(REQUEST_START_TIME); + if (o instanceof Long start) { + long duration = System.currentTimeMillis() - start; + this.latency.record(duration, attrs); + } + } + + @Override + public void filter(ContainerRequestContext req) throws IOException { + // Set the request start time into properties. + req.setProperty(REQUEST_START_TIME, System.currentTimeMillis()); + } + + private static Attributes getRequestAttributes(UriRoutingContext ctx, long statusCode) { + List templates = ctx.getMatchedTemplates(); + ResourceMethod method = ctx.getMatchedResourceMethod(); + String httpMethod = method == null ? "UNKNOWN" : method.getHttpMethod(); + if (CollectionUtils.isEmpty(templates)) { + return Attributes.of(PATH, "UNKNOWN", METHOD, httpMethod, CODE, statusCode); + } + UriTemplate[] arr = templates.toArray(new UriTemplate[0]); + int idx = arr.length - 1; + StringBuilder builder = new StringBuilder(); + for (; idx >= 0; idx--) { + builder.append(arr[idx].getTemplate()); + } + String template = builder.toString().replace("{", ":").replace("}", ""); + return Attributes.of(PATH, template, METHOD, httpMethod, CODE, statusCode); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 902593b7bf678..d438b7a1e23fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -192,6 +192,8 @@ private void addResourceServlet(String basePath, boolean requiresAuthentication, config.register(JsonMapperProvider.class); } config.register(MultiPartFeature.class); + // Register the filter that will collect metrics for REST endpoints + config.register(RestEndpointMetricsFilter.create(pulsar.getOpenTelemetry())); ServletHolder servletHolder = new ServletHolder(new ServletContainer(config)); servletHolder.setAsyncSupported(true); addServlet(basePath, servletHolder, requiresAuthentication, attributeMap); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java new file mode 100644 index 0000000000000..a7983c6b01fc9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerRestEndpointMetricsTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.semconv.SemanticAttributes; +import java.util.Collection; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class BrokerRestEndpointMetricsTest extends MockedPulsarServiceBaseTest { + private static final Logger log = LoggerFactory.getLogger(BrokerRestEndpointMetricsTest.class); + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + } + + @BeforeMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + + @Test + public void testMetrics() throws Exception { + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + admin.tenants().createTenant("test", TenantInfo.builder().allowedClusters(Set.of("test")).build()); + admin.namespaces().createNamespace("test/test"); + String topic = "persistent://test/test/test_" + UUID.randomUUID(); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().getList("test/test"); + + // This request will be failed + try { + admin.topics().createNonPartitionedTopic("persistent://test1/test1/test1"); + } catch (Exception e) { + // ignore + } + + admin.topics().delete(topic, true); + admin.namespaces().deleteNamespace("test/test"); + admin.tenants().deleteTenant("test"); + + Collection metricDatas = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + log.info("Metrics size: {}", metricDatas.size()); + Optional optional = metricDatas.stream().peek(m -> log.info("metric name: {}", m.getName())) + .filter(m -> m.getName().equals("pulsar_broker_rest_endpoint_latency")).findFirst(); + Assert.assertTrue(optional.isPresent()); + + MetricData metricData = optional.get(); + Assert.assertFalse(metricData.getDescription().isEmpty()); + Assert.assertEquals(metricData.getUnit(), "ms"); + Assert.assertEquals(metricData.getType(), MetricDataType.HISTOGRAM); + + @SuppressWarnings("unchecked") + Data data = (Data) metricData.getData(); + data.getPoints().forEach(point -> { + hasAttributes(point); + Assert.assertTrue(point.getCount() > 0); + Assert.assertTrue(point.getSum() > 0); + }); + + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "DELETE")); + Assert.assertTrue(hasPoint(data, "/persistent/:tenant/:namespace/:topic", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "PUT")); + Assert.assertTrue(hasPoint(data, "/tenants/:tenant", "DELETE")); + Assert.assertTrue(hasPoint(data, "/clusters/:cluster", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "PUT")); + Assert.assertTrue(hasPoint(data, "/namespaces/:tenant/:namespace", "DELETE")); + } + + private static boolean hasPoint(Data data, String uri, String method) { + Collection points = data.getPoints(); + for (HistogramPointData point : points) { + Attributes attrs = point.getAttributes(); + + if (attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD).equals(method) + && attrs.get(SemanticAttributes.URL_PATH).equals(uri)) { + return true; + } + } + + return false; + } + + private static void hasAttributes(HistogramPointData data) { + Attributes attrs = data.getAttributes(); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_REQUEST_METHOD)); + Assert.assertNotNull(attrs.get(SemanticAttributes.URL_PATH)); + Assert.assertNotNull(attrs.get(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE)); + } +} \ No newline at end of file