diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1f0cb12258e1d..b08b1a472ca20 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1265,7 +1265,8 @@ private CompletableFuture> createNonPersistentTopic(String topic nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); } catch (Throwable e) { log.warn("Failed to create topic {}", topic, e); - return FutureUtil.failedFuture(e); + topicFuture.completeExceptionally(e); + return topicFuture; } CompletableFuture isOwner = checkTopicNsOwnership(topic); isOwner.thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 8ebba5c9aeabd..5fbe147638026 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -20,20 +20,23 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN; import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; @@ -79,6 +82,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; import org.apache.pulsar.client.admin.BrokerStats; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -113,7 +117,11 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.compaction.Compactor; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.MockZooKeeper; import org.awaitility.Awaitility; +import org.glassfish.jersey.client.JerseyClient; +import org.glassfish.jersey.client.JerseyClientBuilder; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -1589,82 +1597,93 @@ public void testDynamicConfigurationsForceDeleteTenantAllowed() throws Exception }); } - // this test is disabled since it is flaky - @Test(enabled = false) - public void testBrokerStatsTopicLoadFailed() throws Exception { - admin.namespaces().createNamespace("prop/ns-test"); - - String persistentTopic = "persistent://prop/ns-test/topic1_" + UUID.randomUUID(); - String nonPersistentTopic = "non-persistent://prop/ns-test/topic2_" + UUID.randomUUID(); - - BrokerService brokerService = pulsar.getBrokerService(); - brokerService = Mockito.spy(brokerService); - // mock create persistent topic failed - Mockito - .doAnswer(invocation -> { - CompletableFuture f = new CompletableFuture<>(); - f.completeExceptionally(new RuntimeException("This is an exception")); - return f; - }) - .when(brokerService).getManagedLedgerConfig(Mockito.eq(TopicName.get(persistentTopic))); - - // mock create non-persistent topic failed - Mockito - .doAnswer(inv -> { - CompletableFuture f = new CompletableFuture<>(); - f.completeExceptionally(new RuntimeException("This is an exception")); - return f; - }) - .when(brokerService).checkTopicNsOwnership(Mockito.eq(nonPersistentTopic)); - - - PulsarService pulsarService = pulsar; - Field field = PulsarService.class.getDeclaredField("brokerService"); - field.setAccessible(true); - field.set(pulsarService, brokerService); - - CompletableFuture> producer = pulsarClient.newProducer(Schema.STRING) - .topic(persistentTopic) - .createAsync(); - CompletableFuture> producer1 = pulsarClient.newProducer(Schema.STRING) - .topic(nonPersistentTopic) - .createAsync(); - - producer.whenComplete((v, t) -> { - if (t == null) { - try { - v.close(); - } catch (PulsarClientException e) { - // ignore - } + @Test + public void testMetricsPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + admin.topics().unload(topic); + + // Inject an error that makes the topic load fails. + AtomicBoolean failMarker = new AtomicBoolean(true); + mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> { + if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) && + path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) { + return true; } + return false; }); - producer1.whenComplete((v, t) -> { - if (t == null) { - try { - v.close(); - } catch (PulsarClientException e) { - // ignore - } + + // Do test + CompletableFuture> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + Multimap metricMap = PrometheusMetricsClient.parseMetrics(response); + if (!metricMap.containsKey("pulsar_topic_load_failed_count")) { + return false; + } + double topic_load_failed_count = 0; + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) { + topic_load_failed_count += metric.value; } + return topic_load_failed_count >= 1D; }); - Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> { - String json = admin.brokerStats().getMetrics(); - JsonArray metrics = new Gson().fromJson(json, JsonArray.class); - AtomicBoolean flag = new AtomicBoolean(false); - - metrics.forEach(ele -> { - JsonObject obj = ((JsonObject) ele); - JsonObject metrics0 = (JsonObject) obj.get("metrics"); - JsonPrimitive v = (JsonPrimitive) metrics0.get("brk_topic_load_failed_count"); - if (null != v && v.getAsDouble() >= 2D) { - flag.set(true); - } - }); + // Remove the injection. + failMarker.set(false); + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); + } - return flag.get(); + @Test + public void testMetricsNonPersistentTopicLoadFails() throws Exception { + final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", ""); + String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID(); + admin.namespaces().createNamespace(namespace); + + // Inject an error that makes the topic load fails. + // Since we did not set a topic factory name, the "topicFactory" variable is null, inject a mocked + // "topicFactory". + Field fieldTopicFactory = BrokerService.class.getDeclaredField("topicFactory"); + fieldTopicFactory.setAccessible(true); + TopicFactory originalTopicFactory = (TopicFactory) fieldTopicFactory.get(pulsar.getBrokerService()); + assertNull(originalTopicFactory); + TopicFactory mockedTopicFactory = mock(TopicFactory.class); + when(mockedTopicFactory.create(anyString(), any(), any(), any())) + .thenThrow(new RuntimeException("mocked error")); + fieldTopicFactory.set(pulsar.getBrokerService(), mockedTopicFactory); + + // Do test. + CompletableFuture> producer = pulsarClient.newProducer().topic(topic).createAsync(); + JerseyClient httpClient = JerseyClientBuilder.createClient(); + Awaitility.await().until(() -> { + String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/") + .request().get(String.class); + Multimap metricMap = PrometheusMetricsClient.parseMetrics(response); + if (!metricMap.containsKey("pulsar_topic_load_failed_count")) { + return false; + } + double topic_load_failed_count = 0; + for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) { + topic_load_failed_count += metric.value; + } + return topic_load_failed_count >= 1D; }); + + // Remove the injection. + fieldTopicFactory.set(pulsar.getBrokerService(), null); + + // cleanup. + httpClient.close(); + producer.join().close(); + admin.topics().delete(topic); + admin.namespaces().deleteNamespace(namespace); } @Test