Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
Expand Down Expand Up @@ -1494,8 +1495,22 @@ public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(St
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
|| actEx instanceof PulsarAdminException.NotFoundException) {
return CompletableFuture.completedFuture(false);
} else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){
if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) {
// Since the feature PIP-344 isn't supported, restore the behavior to previous
// behavior before https://github.com/apache/pulsar/pull/22838 changes.
log.info("{} Checking the existence of a non-persistent non-partitioned topic "
+ "was performed using the behavior prior to PIP-344 changes, "
+ "because the broker does not support the PIP-344 feature "
+ "'supports_get_partitioned_metadata_without_auto_creation'.",
topic);
return CompletableFuture.completedFuture(false);
} else {
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
} else {
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
log.error("{} Failed to get partition metadata", topic, ex);
return CompletableFuture.failedFuture(ex);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,32 @@
*/
package org.apache.pulsar.broker.admin;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
Expand Down Expand Up @@ -219,4 +231,80 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation,
paramMetadataAutoCreationEnabled, isUsingHttpLookup);
}

@DataProvider(name = "autoCreationParamsAllForNonPersistentTopic")
public Object[][] autoCreationParamsAllForNonPersistentTopic(){
return new Object[][]{
// configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup.
{true, true, true},
{true, true, false},
{true, false, true},
{true, false, false},
{false, true, true},
{false, true, false},
{false, false, true},
{false, false, false}
};
}

@Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE)
public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation,
boolean paramMetadataAutoCreationEnabled,
boolean isUsingHttpLookup) throws Exception {
modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3);

// Initialize the connections of internal Pulsar Client.
PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient();
PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient();
client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));

// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}
// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback
// to "getPartitionsForTopic(topic, true)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();

// Verify: we will not get an un-support error.
PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup);
for (PulsarClientImpl client : clientArray) {
final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp");
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
} catch (Exception ex) {
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException
|| unwrapEx instanceof PulsarClientException.NotFoundException);
assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from"
+ " the broker"));
}
}

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// reset clients.
for (PulsarClientImpl client : Arrays.asList(client1, client2)) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,21 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -225,6 +229,60 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t
}
}

@Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE)
public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception {
modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3);
// Initialize connections.
String pulsarUrl = pulsar1.getBrokerServiceUrl();
PulsarClientImpl[] clients = getClientsToTest(false);
for (PulsarClientImpl client : clients) {
client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1"));
}
// Inject a not support flag into the connections initialized.
Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
field.setAccessible(true);
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, false);
}
}

// Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to
// "getPartitionsForTopic(topic)" behavior.
int lookupPermitsBefore = getLookupRequestPermits();
for (PulsarClientImpl client : clients) {
// Verify: the behavior of topic creation.
final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
client.getPartitionedTopicMetadata(tp, false, true).join();
Optional<PartitionedTopicMetadata> metadata1 = pulsar1.getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources()
.getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join();
assertTrue(metadata1.isPresent());
assertEquals(metadata1.get().partitions, 3);

// Verify: lookup semaphore has been releases.
Awaitility.await().untilAsserted(() -> {
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
});

// Cleanup.
admin1.topics().deletePartitionedTopic(tp, false);
}

// reset clients.
for (PulsarClientImpl client : clients) {
ConnectionPool pool = client.getCnxPool();
for (CompletableFuture<ClientCnx> connectionFuture : pool.getConnections()) {
ClientCnx clientCnx = connectionFuture.join();
clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation();
field.set(clientCnx, true);
}
}
}

@DataProvider(name = "autoCreationParamsAll")
public Object[][] autoCreationParamsAll(){
return new Object[][]{
Expand Down Expand Up @@ -265,7 +323,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 0);
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
assertFalse(partitionedTopics.contains(topicNameStr));
Expand Down Expand Up @@ -298,7 +356,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic
for (PulsarClientImpl client : clientArray) {
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response =
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join();
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join();
assertEquals(response.partitions, 3);
verifyNonPartitionedTopicNeverCreated(topicNameStr);

Expand Down Expand Up @@ -332,7 +390,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
// Case-1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 3);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -347,7 +405,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -380,7 +438,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
// Case 1: normal topic.
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp");
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join();
PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join();
assertEquals(response.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics = admin1.topics().getPartitionedTopicList("public/default");
Expand All @@ -392,7 +450,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo
topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1";
// Verify: the result of get partitioned topic metadata.
PartitionedTopicMetadata response2 =
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join();
client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join();
assertEquals(response2.partitions, 0);
// Verify: the behavior of topic creation.
List<String> partitionedTopics2 =
Expand Down Expand Up @@ -443,7 +501,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati
final TopicName topicName = TopicName.get(topicNameStr);
// Verify: the result of get partitioned topic metadata.
try {
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
fail("Expect a not found exception");
} catch (Exception e) {
Expand Down Expand Up @@ -496,7 +554,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
// Verify: the result of get partitioned topic metadata.
try {
PartitionedTopicMetadata topicMetadata = client
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled)
.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false)
.join();
log.info("Get topic metadata: {}", topicMetadata.partitions);
fail("Expected a not found ex");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
((PulsarClientImpl) pulsarClient).setLookup(mockLookup);
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer(
i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));
when(mockLookup.getBroker(any())).thenAnswer(ignored -> {
InetSocketAddress brokerAddress =
new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down Expand Up @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception {

PartitionedTopicMetadata partitionedTopicMetadata =
((PulsarClientImpl) pulsarClient).getLookup()
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get();
.getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false)
.get();
Transaction lowWaterMarkTxn = null;
for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
lowWaterMarkTxn = pulsarClient.newTransaction()
Expand Down
Loading