Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1226,6 +1226,10 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
return null;
});
if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
Expand Down Expand Up @@ -1618,6 +1622,11 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
TopicName topicName = TopicName.get(topic);
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
return null;
});

if (isTransactionInternalName(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public void recordTopicLoadTimeValue(String topic, long topicLoadLatencyMs) {
}
}

public void recordTopicLoadFailed() {
brokerOperabilityMetrics.recordTopicLoadFailed();
}

public void recordConnectionCreate() {
brokerOperabilityMetrics.recordConnectionCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -29,6 +30,7 @@
/**
*/
public class BrokerOperabilityMetrics {
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
Expand Down Expand Up @@ -84,7 +86,9 @@ Map<String, String> getDimensionMap(String metricsName) {
}

Metrics getTopicLoadMetrics() {
return getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
Metrics metrics = getDimensionMetrics("topic_load_times", "topic_load", topicLoadStats);
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
return metrics;
}

Metrics getDimensionMetrics(String metricsName, String dimensionName, DimensionStats stats) {
Expand Down Expand Up @@ -112,6 +116,10 @@ public void recordTopicLoadTimeValue(long topicLoadLatencyMs) {
topicLoadStats.recordDimensionTimeValue(topicLoadLatencyMs, TimeUnit.MILLISECONDS);
}

public void recordTopicLoadFailed() {
this.TOPIC_LOAD_FAILED.inc();
}

public void recordConnectionCreate() {
this.connectionTotalCreatedCount.increment();
this.connectionActive.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -1513,4 +1515,82 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception
assertTrue(conf.isForceDeleteTenantAllowed());
});
}


@Test
public void testBrokerStatsTopicLoadFailed() throws Exception {
admin.namespaces().createNamespace("prop/ns-test");

String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID();
String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID();

BrokerService brokerService = pulsar.getBrokerService();
brokerService = Mockito.spy(brokerService);
// mock create persistent topic failed
Mockito
.doAnswer(invocation -> {
CompletableFuture<ManagedLedgerConfig> f = new CompletableFuture<>();
f.completeExceptionally(new RuntimeException("This is an exception"));
return f;
})
.when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic)));

// mock create non-persistent topic failed
Mockito
.doAnswer(inv -> {
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(new RuntimeException("This is an exception"));
return f;
})
.when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic));


PulsarService pulsarService = pulsar;
Field field = PulsarService.class.getDeclaredField("brokerService");
field.setAccessible(true);
field.set(pulsarService, brokerService);

CompletableFuture<Producer<String>> producer = pulsarClient.newProducer(Schema.STRING)
.topic(persistentTopic)
.createAsync();
CompletableFuture<Producer<String>> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(nonPersistentTopic)
.createAsync();

producer.whenComplete((v, t) -> {
if (t == null) {
try {
v.close();
} catch (PulsarClientException e) {
// ignore
}
}
});
producer1.whenComplete((v, t) -> {
if (t == null) {
try {
v.close();
} catch (PulsarClientException e) {
// ignore
}
}
});

Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> {
String json = admin.brokerStats().getMetrics();
JsonArray metrics = new Gson().fromJson(json, JsonArray.class);
AtomicBoolean flag = new AtomicBoolean(false);

metrics.forEach(ele -> {
JsonObject obj = ((JsonObject) ele);
JsonObject metrics0 = (JsonObject) obj.get("metrics");
JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count");
if (null != v && v.getAsDouble() >= 2D) {
flag.set(true);
}
});

return flag.get();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,10 @@ public void testPerTopicStats() throws Exception {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");

cm = (List<Metric>) metrics.get("topic_load_failed_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");

cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
Expand Down