diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 663d013dc7439..3dc7718c32a74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1226,6 +1226,10 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { CompletableFuture> topicFuture = new CompletableFuture<>(); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1618,6 +1622,11 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(); + return null; + }); + if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index e959e9bbda2bb..db14892d26663 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -265,6 +265,10 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) { } } + public void recordTopicLoadFailed() { + brokerOperabilityMetrics.recordTopicLoadFailed(); + } + public void recordConnectionCreate() { brokerOperabilityMetrics.recordConnectionCreate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 909133338719f..400dbd3335a2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,6 +30,7 @@ /** */ public class BrokerOperabilityMetrics { + private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); private final List metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -84,7 +86,9 @@ Map getDimensionMap(String metricsName) { } Metrics getTopicLoadMetrics() { - return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get()); + return metrics; } Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) { @@ -112,6 +116,10 @@ public void recordTopicLoadTimeValue(long topicLoadLatencyMs) { topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS); } + public void recordTopicLoadFailed() { + this.TOPIC_LOAD_FAILED.inc(); + } + public void recordConnectionCreate() { this.connectionTotalCreatedCount.increment(); this.connectionActive.increment(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e9b7ddb991e57..1d45c87b2f256 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -33,6 +33,7 @@ import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -107,6 +108,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1513,4 +1515,82 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception assertTrue(conf.isForceDeleteTenantAllowed()); }); } + + + @Test + public void testBrokerStatsTopicLoadFailed() throws Exception { + admin.namespaces().createNamespace("prop/ns-test"); + + String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); + String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); + + BrokerService brokerService = pulsar.getBrokerService(); + brokerService = Mockito.spy(brokerService); + // mock create persistent topic failed + Mockito + .doAnswer(invocation -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); + + // mock create non-persistent topic failed + Mockito + .doAnswer(inv -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); + + + PulsarService pulsarService = pulsar; + Field field = PulsarService.class.getDeclaredField("brokerService"); + field.setAccessible(true); + field.set(pulsarService, brokerService); + + CompletableFuture> producer = pulsarClient.newProducer(Schema.STRING) + .topic(persistentTopic) + .createAsync(); + CompletableFuture> producer1 = pulsarClient.newProducer(Schema.STRING) + .topic(nonPersistentTopic) + .createAsync(); + + producer.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + producer1.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + + Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { + String json = admin.brokerStats().getMetrics(); + JsonArray metrics = new Gson().fromJson(json, JsonArray.class); + AtomicBoolean flag = new AtomicBoolean(false); + + metrics.forEach(ele -> { + JsonObject obj = ((JsonObject) ele); + JsonObject metrics0 = (JsonObject) obj.get("metrics"); + JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); + if (null != v && v.getAsDouble() >= 2D) { + flag.set(true); + } + }); + + return flag.get(); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a7a28afd8ac64..6cb7378330f09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -333,6 +333,10 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List) metrics.get("topic_load_failed_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List) metrics.get("pulsar_in_bytes_total"); assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");