diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fee5e25647ce6..18808c1801483 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1612,7 +1612,13 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(); + // If the broker is not the owner of the topic, the client will redirect to another broker. + // In this case, we should not record as a topic load failure. + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof ServiceUnitNotReadyException + && cause.getMessage().contains("not served by this instance"))) { + getPulsarStats().recordTopicLoadFailed(); + } return null; }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 8dd2fc1c3c26d..7aabd62e63c3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -714,6 +714,14 @@ public Object[][] incorrectPersistentPolicies() { }; } + @DataProvider(name = "TopicDomain") + public Object[][] topicDomain() { + return new Object[][] { + {"persistent"}, + {"non-persistent"} + }; + } + protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, String topicName) { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 951247bd68861..f9836bbf5c093 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -78,7 +78,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; @@ -103,7 +102,6 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "flaky") @@ -808,14 +806,6 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception { assertFalse(getResult.isPresent()); } - @DataProvider(name = "topicDomain") - public Object[] topicDomain() { - return new Object[]{ - TopicDomain.persistent.value(), - TopicDomain.non_persistent.value() - }; - } - @Test(dataProvider = "topicDomain") public void testCheckTopicExists(String topicDomain) throws Exception { String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e05bb836a3ce6..0755861af5954 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -26,7 +26,10 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -1308,7 +1311,7 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro topic.checkInactiveSubscriptions(); // Compaction subscription should not call delete method. - Mockito.verify(spySubscription, Mockito.never()).delete(); + verify(spySubscription, never()).delete(); // check if the subscription exist. assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); @@ -1916,5 +1919,67 @@ public void close() { } } } + + public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException() throws Exception { + final String topicName = "persistent://prop/ns-abc/topic-not-ready"; + BrokerService service = pulsar.getBrokerService(); + + // Mock the checkTopicNsOwnership to throw ServiceUnitNotReadyException + BrokerService spyService = spy(service); + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServiceUnitNotReadyException("Namespace bundle for topic " + + "(persistent://xxx/xxx/xxx-partition-0) not served by this instance:xyz. " + + "Please redo the lookup. Request is denied: namespace=pfs/ons"))) + .when(spyService).checkTopicNsOwnership(topicName); + + when(spyService.getPulsarStats()).thenReturn(mock(PulsarStats.class)); + + // Call the method under test + CompletableFuture> future = spyService.getTopic(topicName, true); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), never()).recordTopicLoadFailed(); + + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServerMetadataException("Server metadata error"))) + .when(spyService).checkTopicNsOwnership(topicName); + + // Call the method under test + future = spyService.getTopic(topicName, true); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServerMetadataException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), times(1)).recordTopicLoadFailed(); + + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServiceUnitNotReadyException("Other exception"))) + .when(spyService).checkTopicNsOwnership(topicName); + + // Call the method under test + future = spyService.getTopic(topicName, true); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), times(2)).recordTopicLoadFailed(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index e006b72fad279..71a1686555939 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -104,11 +104,6 @@ public static Object[][] schemaValidationModes() { }; } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[] { "persistent://", "non-persistent://" }; - } - private final boolean schemaValidationEnforced; @Factory(dataProvider = "schemaValidationModes") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 5448751160a9c..4db3b27e1c544 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -98,11 +98,6 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[]{ TopicDomain.persistent.value(), TopicDomain.non_persistent.value()}; - } - private Set publishMessages(String topic, int count, boolean enableBatch) throws Exception { return publishMessages(topic, 0, count, enableBatch, false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index ab82f981b5df3..6db29ef64ba6c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -98,7 +98,6 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -128,11 +127,6 @@ public void cleanup() throws Exception { super.internalCleanup(); } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[] { "persistent://", "non-persistent://" }; - } - @Test public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{ final String tenant = PUBLIC_TENANT;