From 195345683b45a34dde48b5e6c7d88706b17e91bc Mon Sep 17 00:00:00 2001 From: daojun Date: Sun, 15 Jan 2023 18:53:08 +0800 Subject: [PATCH 1/7] Add `topic_load_failed` metric --- .../apache/pulsar/broker/service/BrokerService.java | 9 +++++++++ .../org/apache/pulsar/broker/service/PulsarStats.java | 8 ++++++++ .../pulsar/broker/stats/BrokerOperabilityMetrics.java | 10 +++++++++- 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 663d013dc7439..e334988de1d3c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1226,6 +1226,10 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { CompletableFuture> topicFuture = new CompletableFuture<>(); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(topic); + return Optional.empty(); + }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { log.debug("Broker is unable to load non-persistent topic {}", topic); @@ -1618,6 +1622,11 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); + topicFuture.exceptionally(t -> { + pulsarStats.recordTopicLoadFailed(topic); + return Optional.empty(); + }); + if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index e959e9bbda2bb..d73a0cca7d553 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -265,6 +265,14 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) { } } + public void recordTopicLoadFailed(String topic) { + try { + brokerOperabilityMetrics.recordTopicLoadFailed(); + } catch (Exception ex) { + log.warn("Exception while recording topic failed for topic {}, {}", topic, ex.getMessage()); + } + } + public void recordConnectionCreate() { brokerOperabilityMetrics.recordConnectionCreate(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 909133338719f..71521b42282ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.stats; +import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,6 +39,7 @@ public class BrokerOperabilityMetrics { private final LongAdder connectionCreateFailCount; private final LongAdder connectionTotalClosedCount; private final LongAdder connectionActive; + private final Counter topicLoadFailed = Counter.build("topic_load_failed", "-").register(); public BrokerOperabilityMetrics(String localCluster, String brokerName) { this.metricsList = new ArrayList<>(); @@ -84,7 +86,9 @@ Map getDimensionMap(String metricsName) { } Metrics getTopicLoadMetrics() { - return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); + metrics.put("brk_topic_load_failed_count", topicLoadFailed.get()); + return metrics; } Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) { @@ -112,6 +116,10 @@ public void recordTopicLoadTimeValue(long topicLoadLatencyMs) { topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS); } + public void recordTopicLoadFailed() { + this.topicLoadFailed.inc(); + } + public void recordConnectionCreate() { this.connectionTotalCreatedCount.increment(); this.connectionActive.increment(); From b17a9511ade458d7cf23ebbf3731dd3bc142b62b Mon Sep 17 00:00:00 2001 From: daojun Date: Sun, 15 Jan 2023 19:47:06 +0800 Subject: [PATCH 2/7] review fix --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e334988de1d3c..35d14ee6db5c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1228,7 +1228,7 @@ private CompletableFuture> createNonPersistentTopic(String topic CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { pulsarStats.recordTopicLoadFailed(topic); - return Optional.empty(); + return null; }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { if (log.isDebugEnabled()) { @@ -1624,7 +1624,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, topicFuture.exceptionally(t -> { pulsarStats.recordTopicLoadFailed(topic); - return Optional.empty(); + return null; }); if (isTransactionInternalName(topicName)) { From 28461b70c8a52123d7a7dabb1f7479a68f656efb Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 17 Jan 2023 02:01:26 +0800 Subject: [PATCH 3/7] review fix & add tests --- .../pulsar/broker/service/BrokerService.java | 4 +- .../pulsar/broker/service/PulsarStats.java | 8 +- .../broker/service/BrokerServiceTest.java | 96 ++++++++++++++++--- .../broker/stats/PrometheusMetricsTest.java | 4 + 4 files changed, 92 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 35d14ee6db5c7..3dc7718c32a74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1227,7 +1227,7 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(topic); + pulsarStats.recordTopicLoadFailed(); return null; }); if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { @@ -1623,7 +1623,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(topic); + pulsarStats.recordTopicLoadFailed(); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index d73a0cca7d553..db14892d26663 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -265,12 +265,8 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) { } } - public void recordTopicLoadFailed(String topic) { - try { - brokerOperabilityMetrics.recordTopicLoadFailed(); - } catch (Exception ex) { - log.warn("Exception while recording topic failed for topic {}, {}", topic, ex.getMessage()); - } + public void recordTopicLoadFailed() { + brokerOperabilityMetrics.recordTopicLoadFailed(); } public void recordConnectionCreate() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e9b7ddb991e57..b1f785aef8ea1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -33,6 +33,7 @@ import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -42,14 +43,7 @@ import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -100,13 +94,13 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.LocalPolicies; -import org.apache.pulsar.common.policies.data.SubscriptionStats; -import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.*; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -1513,4 +1507,82 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception assertTrue(conf.isForceDeleteTenantAllowed()); }); } + + + @Test + public void testBrokerStatsTopicLoadFailed() throws Exception { + admin.namespaces().createNamespace("prop/ns-test"); + + String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); + String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); + + BrokerService brokerService = pulsar.getBrokerService(); + brokerService = Mockito.spy(brokerService); + // mock create persistent topic failed + Mockito + .doAnswer(invocation -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); + + // mock create non-persistent topic failed + Mockito + .doAnswer(inv -> { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new RuntimeException("This is an exception")); + return f; + }) + .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); + + + PulsarService pulsarService = pulsar; + Field field = PulsarService.class.getDeclaredField("brokerService"); + field.setAccessible(true); + field.set(pulsarService, brokerService); + + CompletableFuture> producer = pulsarClient.newProducer(Schema.STRING) + .topic(persistentTopic) + .createAsync(); + CompletableFuture> producer1 = pulsarClient.newProducer(Schema.STRING) + .topic(nonPersistentTopic) + .createAsync(); + + producer.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + producer1.whenComplete((v, t) -> { + if (t == null) { + try { + v.close(); + } catch (PulsarClientException e) { + // ignore + } + } + }); + + Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { + String json = admin.brokerStats().getMetrics(); + JsonArray metrics = new Gson().fromJson(json, JsonArray.class); + AtomicBoolean flag = new AtomicBoolean(false); + + metrics.forEach(ele -> { + JsonObject obj = ((JsonObject) ele); + JsonObject metrics0 = (JsonObject) obj.get("metrics"); + JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); + if (null != v && v.getAsDouble() >= 2D) { + flag.set(true); + } + }); + + return flag.get(); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a7a28afd8ac64..6cb7378330f09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -333,6 +333,10 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List) metrics.get("topic_load_failed_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).tags.get("cluster"), "test"); + cm = (List) metrics.get("pulsar_in_bytes_total"); assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2"); From dd3bbbb982da5830fc0ca25374b490b72fbb96b1 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 1 Mar 2023 01:35:52 +0800 Subject: [PATCH 4/7] fix imports --- .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index b1f785aef8ea1..40f1bdb0b2a7a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -99,8 +99,6 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; From 496a0110f91066a49de8f57672ddd60d9c3618cc Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 1 Mar 2023 01:38:16 +0800 Subject: [PATCH 5/7] fix imports --- .../pulsar/broker/service/BrokerServiceTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 40f1bdb0b2a7a..1d45c87b2f256 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -43,7 +43,14 @@ import java.io.InputStreamReader; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -94,7 +101,10 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.*; +import org.apache.pulsar.common.policies.data.BundlesData; +import org.apache.pulsar.common.policies.data.LocalPolicies; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.awaitility.Awaitility; From 2e6567b3b90b4a9b0e56858d3f76c36d018cf354 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 2 Mar 2023 14:26:09 +0800 Subject: [PATCH 6/7] fix test issue --- .../apache/pulsar/broker/stats/BrokerOperabilityMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 71521b42282ac..e6f264cfcfdb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -39,7 +39,7 @@ public class BrokerOperabilityMetrics { private final LongAdder connectionCreateFailCount; private final LongAdder connectionTotalClosedCount; private final LongAdder connectionActive; - private final Counter topicLoadFailed = Counter.build("topic_load_failed", "-").register(); + private static final Counter topicLoadFailed = Counter.build("topic_load_failed", "-").register(); public BrokerOperabilityMetrics(String localCluster, String brokerName) { this.metricsList = new ArrayList<>(); From 4d1fab7241a81f8bb9b495c5a981dc39b7b25595 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 2 Mar 2023 14:26:54 +0800 Subject: [PATCH 7/7] fix test issue --- .../pulsar/broker/stats/BrokerOperabilityMetrics.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index e6f264cfcfdb7..400dbd3335a2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -30,6 +30,7 @@ /** */ public class BrokerOperabilityMetrics { + private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register(); private final List metricsList; private final String localCluster; private final DimensionStats topicLoadStats; @@ -39,7 +40,6 @@ public class BrokerOperabilityMetrics { private final LongAdder connectionCreateFailCount; private final LongAdder connectionTotalClosedCount; private final LongAdder connectionActive; - private static final Counter topicLoadFailed = Counter.build("topic_load_failed", "-").register(); public BrokerOperabilityMetrics(String localCluster, String brokerName) { this.metricsList = new ArrayList<>(); @@ -87,7 +87,7 @@ Map getDimensionMap(String metricsName) { Metrics getTopicLoadMetrics() { Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats); - metrics.put("brk_topic_load_failed_count", topicLoadFailed.get()); + metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get()); return metrics; } @@ -117,7 +117,7 @@ public void recordTopicLoadTimeValue(long topicLoadLatencyMs) { } public void recordTopicLoadFailed() { - this.topicLoadFailed.inc(); + this.TOPIC_LOAD_FAILED.inc(); } public void recordConnectionCreate() {