Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,13 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
// If the broker is not the owner of the topic, the client will redirect to another broker.
// In this case, we should not record as a topic load failure.
Throwable cause = FutureUtil.unwrapCompletionException(t);
if (!(cause instanceof ServiceUnitNotReadyException
&& cause.getMessage().contains("not served by this instance"))) {
getPulsarStats().recordTopicLoadFailed();
}
return null;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,14 @@ public Object[][] incorrectPersistentPolicies() {
};
}

@DataProvider(name = "TopicDomain")
public Object[][] topicDomain() {
return new Object[][] {
{"persistent"},
{"non-persistent"}
};
}

protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, String topicName) {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -103,7 +102,6 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "flaky")
Expand Down Expand Up @@ -808,14 +806,6 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
assertFalse(getResult.isPresent());
}

@DataProvider(name = "topicDomain")
public Object[] topicDomain() {
return new Object[]{
TopicDomain.persistent.value(),
TopicDomain.non_persistent.value()
};
}

@Test(dataProvider = "topicDomain")
public void testCheckTopicExists(String topicDomain) throws Exception {
String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -1308,7 +1311,7 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro
topic.checkInactiveSubscriptions();

// Compaction subscription should not call delete method.
Mockito.verify(spySubscription, Mockito.never()).delete();
verify(spySubscription, never()).delete();

// check if the subscription exist.
assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
Expand Down Expand Up @@ -1916,5 +1919,67 @@ public void close() {
}
}
}

public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic-not-ready";
BrokerService service = pulsar.getBrokerService();

// Mock the checkTopicNsOwnership to throw ServiceUnitNotReadyException
BrokerService spyService = spy(service);
doReturn(CompletableFuture.failedFuture(
new BrokerServiceException.ServiceUnitNotReadyException("Namespace bundle for topic "
+ "(persistent://xxx/xxx/xxx-partition-0) not served by this instance:xyz. "
+ "Please redo the lookup. Request is denied: namespace=pfs/ons")))
.when(spyService).checkTopicNsOwnership(topicName);

when(spyService.getPulsarStats()).thenReturn(mock(PulsarStats.class));

// Call the method under test
CompletableFuture<Optional<Topic>> future = spyService.getTopic(topicName, true);

try {
future.get();
fail("Should have thrown an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException);
}

// Verify that recordTopicLoadFailed was not called
verify(spyService.getPulsarStats(), never()).recordTopicLoadFailed();

doReturn(CompletableFuture.failedFuture(
new BrokerServiceException.ServerMetadataException("Server metadata error")))
.when(spyService).checkTopicNsOwnership(topicName);

// Call the method under test
future = spyService.getTopic(topicName, true);

try {
future.get();
fail("Should have thrown an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof BrokerServiceException.ServerMetadataException);
}

// Verify that recordTopicLoadFailed was not called
verify(spyService.getPulsarStats(), times(1)).recordTopicLoadFailed();

doReturn(CompletableFuture.failedFuture(
new BrokerServiceException.ServiceUnitNotReadyException("Other exception")))
.when(spyService).checkTopicNsOwnership(topicName);

// Call the method under test
future = spyService.getTopic(topicName, true);

try {
future.get();
fail("Should have thrown an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException);
}

// Verify that recordTopicLoadFailed was not called
verify(spyService.getPulsarStats(), times(2)).recordTopicLoadFailed();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ public static Object[][] schemaValidationModes() {
};
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[] { "persistent://", "non-persistent://" };
}

private final boolean schemaValidationEnforced;

@Factory(dataProvider = "schemaValidationModes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,6 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[]{ TopicDomain.persistent.value(), TopicDomain.non_persistent.value()};
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
return publishMessages(topic, 0, count, enableBatch, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
Expand Down Expand Up @@ -128,11 +127,6 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[] { "persistent://", "non-persistent://" };
}

@Test
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
final String tenant = PUBLIC_TENANT;
Expand Down