diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 71b31eeeeda2b..6d9068482b4bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -40,8 +40,10 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; +import io.prometheus.client.Counter; import io.prometheus.client.Gauge; import java.io.IOException; +import io.prometheus.client.Histogram; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; @@ -251,6 +253,17 @@ protected Set initialValue() throws Exception { } }; + private static final Histogram CMD_LATENCY = Histogram + .build("pulsar_broker_command_execution_latency", "-") + .unit("ms") + .labelNames("command") + .buckets(1, 10, 50, 100, 200, 500, 1000, 2000) + .register(); + private static final Counter CMD_FAILED = Counter + .build("pulsar_broker_command_execution_failed", "-") + .labelNames("command", "code") + .register(); + enum State { Start, Connected, Failed, Connecting @@ -530,6 +543,8 @@ private void writeAndFlush(ByteBuf cmd) { @Override protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) { checkArgument(state == State.Connected); + final long now = System.currentTimeMillis(); + final String cmd = BaseCommand.Type.PARTITIONED_METADATA.name(); final long requestId = partitionMetadata.getRequestId(); if (log.isDebugEnabled()) { log.debug("[{}] Received PartitionMetadataLookup from {} for {}", partitionMetadata.getTopic(), @@ -538,6 +553,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa TopicName topicName = validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata); if (topicName == null) { + CMD_FAILED.labels(cmd, ServerError.InvalidTopicName.name()).inc(); return; } @@ -548,8 +564,8 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa partitionMetadata.getTopic(), remoteAddress, requestId, this.service.getPulsar().getState().toString()); } + CMD_FAILED.labels(cmd, ServerError.ServiceNotReady.name()).inc(); writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.ServiceNotReady, - "Failed due to pulsar service is not ready", requestId)); return; } @@ -562,25 +578,29 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa .handle((metadata, ex) -> { if (ex == null) { int partitions = metadata.partitions; + CMD_LATENCY.labels(cmd).observe(System.currentTimeMillis() - now); commandSender.sendPartitionMetadataResponse(partitions, requestId); } else { if (ex instanceof PulsarClientException) { log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(), remoteAddress, topicName, ex.getMessage()); + CMD_FAILED + .labels(cmd, ServerError.AuthorizationError.name()).inc(); commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, ex.getMessage(), requestId); } else { log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress, topicName, ex.getMessage(), ex); ServerError error = ServerError.ServiceNotReady; - if (ex instanceof RestException restException){ + if (ex instanceof RestException restException) { int responseCode = restException.getResponse().getStatus(); - if (responseCode == NOT_FOUND.getStatusCode()){ + if (responseCode == NOT_FOUND.getStatusCode()) { error = ServerError.TopicNotFound; - } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()){ + } else if (responseCode < INTERNAL_SERVER_ERROR.getStatusCode()) { error = ServerError.MetadataError; } } + CMD_FAILED.labels(cmd, error.name()).inc(); commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId); } } @@ -590,6 +610,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa } else { final String msg = "Client is not authorized to Get Partition Metadata"; log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, getPrincipal(), topicName); + CMD_FAILED.labels(cmd, ServerError.AuthorizationError.name()).inc(); writeAndFlush( Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId)); lookupSemaphore.release(); @@ -608,6 +629,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", remoteAddress, topicName); } + CMD_FAILED.labels(cmd, ServerError.TooManyRequests.name()).inc(); commandSender.sendPartitionMetadataResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ServerCnxStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ServerCnxStatsTest.java new file mode 100644 index 0000000000000..2ef2f3a69c330 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ServerCnxStatsTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import org.apache.pulsar.broker.service.*; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.junit.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Method; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +@Test(groups = "broker") +public class ServerCnxStatsTest extends BrokerTestBase { + + private final String tenant = "my-tenant"; + private final String namespace = "my-tenant/my-ns"; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + admin.tenants().createTenant(tenant, TenantInfo.builder().allowedClusters(Sets.newHashSet("test")).build()); + admin.namespaces().createNamespace(namespace); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + + @Test + public void testPartitionMetadataRequestMetrics() throws Exception { + String topic = "persistent://my-tenant/my-ns/my-topic_" + UUID.randomUUID(); + admin.topics().createPartitionedTopic(topic, 3); + CountDownLatch latch = new CountDownLatch(100); + + try (Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("my-sub") + .messageListener((c, m) -> { + try { + c.acknowledge(m); + latch.countDown(); + } catch (PulsarClientException e) { + e.printStackTrace(); + } + }) + .subscribe()) { + for (int a = 0; a < 100; a++) { + producer.send(UUID.randomUUID().toString()); + } + + latch.await(30, TimeUnit.SECONDS); + + // Mock failed PartitionMetadataRequest + ByteBuf buf = + Commands.newPartitionMetadataRequest("xx://xx/xx/" + UUID.randomUUID(), 100); + BaseCommand c = new BaseCommand(); + c.parseFrom(ByteBufUtil.getBytes(buf)); + CommandPartitionedTopicMetadata ctm = c.getPartitionMetadata(); + Topic topic1 = pulsar.getBrokerService().getTopic(topic + "-partition-0", false).get().get(); + Map producers = topic1.getProducers(); + Class klass = ServerCnx.class; + Method method = klass.getDeclaredMethod("handlePartitionMetadataRequest", CommandPartitionedTopicMetadata.class); + method.setAccessible(true); + for (org.apache.pulsar.broker.service.Producer p : producers.values()) { + ServerCnx tcnx = (ServerCnx) p.getCnx(); + method.invoke(tcnx, ctm); + } + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output); + Multimap metricsMap = PrometheusMetricsTest.parseMetrics(output.toString()); + Collection cmdExecutionFailed = metricsMap.get("pulsar_broker_command_execution_failed" + "_total"); + Collection cmdExecutionLatency = metricsMap.get("pulsar_broker_command_execution_latency_ms" + "_sum"); + + for (PrometheusMetricsTest.Metric m : cmdExecutionFailed) { + String cluster = m.tags.get("cluster"); + Assert.assertNotNull(cluster); + Assert.assertEquals(cluster, "test"); + String command = m.tags.get("command"); + Assert.assertNotNull(command); + if (command.equals(BaseCommand.Type.PARTITIONED_METADATA.name())) { + Assert.assertTrue(m.value >= 1); + } + } + + for (PrometheusMetricsTest.Metric m : cmdExecutionLatency) { + String cluster = m.tags.get("cluster"); + Assert.assertNotNull(cluster); + Assert.assertEquals(cluster, "test"); + String command = m.tags.get("command"); + Assert.assertNotNull(command); + if (command.equals(BaseCommand.Type.PARTITIONED_METADATA.name())) { + Assert.assertTrue(m.value > 0); + } + } + } + } +}