diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ec4c907234ab6..52e03fb8b9aec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; import com.google.common.hash.Hashing; import io.opentelemetry.api.common.AttributeKey; @@ -1494,8 +1495,22 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { return CompletableFuture.completedFuture(false); + } else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){ + if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) { + // Since the feature PIP-344 isn't supported, restore the behavior to previous + // behavior before https://github.com/apache/pulsar/pull/22838 changes. + log.info("{} Checking the existence of a non-persistent non-partitioned topic " + + "was performed using the behavior prior to PIP-344 changes, " + + "because the broker does not support the PIP-344 feature " + + "'supports_get_partitioned_metadata_without_auto_creation'.", + topic); + return CompletableFuture.completedFuture(false); + } else { + log.error("{} Failed to get partition metadata", topic, ex); + return CompletableFuture.failedFuture(ex); + } } else { - log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); + log.error("{} Failed to get partition metadata", topic, ex); return CompletableFuture.failedFuture(ex); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index 28cf91ee165e2..60691203e777d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -18,20 +18,32 @@ */ package org.apache.pulsar.broker.admin; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; import java.net.URL; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -219,4 +231,80 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation, paramMetadataAutoCreationEnabled, isUsingHttpLookup); } + + @DataProvider(name = "autoCreationParamsAllForNonPersistentTopic") + public Object[][] autoCreationParamsAllForNonPersistentTopic(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true}, + {true, true, false}, + {true, false, true}, + {true, false, false}, + {false, true, true}, + {false, true, false}, + {false, false, true}, + {false, false, false} + }; + } + + @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE) + public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); + + // Initialize the connections of internal Pulsar Client. + PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient(); + PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient(); + client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + + // Inject a not support flag into the connections initialized. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (PulsarClientImpl client : Arrays.asList(client1, client2)) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, false); + } + } + // Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback + // to "getPartitionsForTopic(topic, true)" behavior. + int lookupPermitsBefore = getLookupRequestPermits(); + + // Verify: we will not get an un-support error. + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); + try { + PartitionedTopicMetadata topicMetadata = client + .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) + .join(); + log.info("Get topic metadata: {}", topicMetadata.partitions); + } catch (Exception ex) { + Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); + assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from" + + " the broker")); + } + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + + // reset clients. + for (PulsarClientImpl client : Arrays.asList(client1, client2)) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, true); + } + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index bf99b172829a7..87bc4267b48a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -23,10 +23,12 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Field; import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -34,6 +36,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -225,6 +229,60 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t } } + @Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE) + public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception { + modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3); + // Initialize connections. + String pulsarUrl = pulsar1.getBrokerServiceUrl(); + PulsarClientImpl[] clients = getClientsToTest(false); + for (PulsarClientImpl client : clients) { + client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + } + // Inject a not support flag into the connections initialized. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (PulsarClientImpl client : clients) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, false); + } + } + + // Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to + // "getPartitionsForTopic(topic)" behavior. + int lookupPermitsBefore = getLookupRequestPermits(); + for (PulsarClientImpl client : clients) { + // Verify: the behavior of topic creation. + final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + client.getPartitionedTopicMetadata(tp, false, true).join(); + Optional metadata1 = pulsar1.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join(); + assertTrue(metadata1.isPresent()); + assertEquals(metadata1.get().partitions, 3); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + + // Cleanup. + admin1.topics().deletePartitionedTopic(tp, false); + } + + // reset clients. + for (PulsarClientImpl client : clients) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, true); + } + } + } + @DataProvider(name = "autoCreationParamsAll") public Object[][] autoCreationParamsAll(){ return new Object[][]{ @@ -265,7 +323,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo for (PulsarClientImpl client : clientArray) { // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response = - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join(); assertEquals(response.partitions, 0); List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); assertFalse(partitionedTopics.contains(topicNameStr)); @@ -298,7 +356,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic for (PulsarClientImpl client : clientArray) { // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response = - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join(); assertEquals(response.partitions, 3); verifyNonPartitionedTopicNeverCreated(topicNameStr); @@ -332,7 +390,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai // Case-1: normal topic. final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); // Verify: the result of get partitioned topic metadata. - PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join(); assertEquals(response.partitions, 3); // Verify: the behavior of topic creation. List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); @@ -347,7 +405,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response2 = - client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join(); assertEquals(response2.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics2 = @@ -380,7 +438,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo // Case 1: normal topic. final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); // Verify: the result of get partitioned topic metadata. - PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join(); assertEquals(response.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); @@ -392,7 +450,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response2 = - client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join(); assertEquals(response2.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics2 = @@ -443,7 +501,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati final TopicName topicName = TopicName.get(topicNameStr); // Verify: the result of get partitioned topic metadata. try { - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) .join(); fail("Expect a not found exception"); } catch (Exception e) { @@ -496,7 +554,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config // Verify: the result of get partitioned topic metadata. try { PartitionedTopicMetadata topicMetadata = client - .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) + .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) .join(); log.info("Get topic metadata: {}", topicMetadata.partitions); fail("Expected a not found ex"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 55601ad4c6b1d..c90ad15242c85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -136,6 +136,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer( i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); + when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer( + i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(ignored -> { InetSocketAddress brokerAddress = new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index b17565cfc0dfa..2e6d9c61bde79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false) + .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false) + .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 57d709e9768c3..e25212e0108f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -208,7 +208,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); Assert.fail("Expected an error that the broker version is too old."); } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); + Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker")); } // cleanup. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index c460fee11d0e6..513ee4d7e4edf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import lombok.Getter; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -741,6 +742,30 @@ public NotSupportedException(String msg) { } } + /** + * Not supported exception thrown by Pulsar client. + */ + public static class FeatureNotSupportedException extends NotSupportedException { + + @Getter + private final FailedFeatureCheck failedFeatureCheck; + + public FeatureNotSupportedException(String msg, FailedFeatureCheck failedFeatureCheck) { + super(msg); + this.failedFeatureCheck = failedFeatureCheck; + } + } + + /** + * "supports_auth_refresh" was introduced at "2.6" and is no longer supported, so skip this enum. + * "supports_broker_entry_metadata" was introduced at "2.8" and is no longer supported, so skip this enum. + * "supports_partial_producer" was introduced at "2.10" and is no longer supported, so skip this enum. + * "supports_topic_watchers" was introduced at "2.11" and is no longer supported, so skip this enum. + */ + public enum FailedFeatureCheck { + SupportsGetPartitionedMetadataWithoutAutoCreation; + } + /** * Not allowed exception thrown by Pulsar client. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index bf015c564b9cc..6ee6fafde1c25 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import static java.lang.String.format; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; @@ -146,12 +147,13 @@ public CompletableFuture getBroker(TopicName topicName) { */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { final MutableObject newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { CompletableFuture newFuture = getPartitionedTopicMetadata( - serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled); + serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled, + useFallbackForNonPIP344Brokers); newFutureCreated.setValue(newFuture); return newFuture; }); @@ -248,21 +250,32 @@ private CompletableFuture findBroker(InetSocketAddress socket } private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress, - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { - partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of" - + " getting partitions without auto-creation is not supported from the broker," - + " please upgrade the broker to the latest version.")); - return; + if (useFallbackForNonPIP344Brokers) { + log.info("[{}] Using original behavior of getPartitionedTopicMetadata(topic) in " + + "getPartitionedTopicMetadata(topic, false) " + + "since the target broker does not support PIP-344 and fallback is enabled.", topicName); + finalAutoCreationEnabled = true; + } else { + partitionFuture.completeExceptionally( + new PulsarClientException.FeatureNotSupportedException("The feature of " + + "getting partitions without auto-creation is not supported by the broker. " + + "Please upgrade the broker to version that supports PIP-344 to resolve this " + + "issue.", + SupportsGetPartitionedMetadataWithoutAutoCreation)); + return; + } } long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, - metadataAutoCreationEnabled); + finalAutoCreationEnabled); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 7197cf6be79d5..351025d426a39 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -107,7 +107,7 @@ public Consumer subscribe() throws PulsarClientException { private CompletableFuture checkDlqAlreadyExists(String topic) { CompletableFuture existsFuture = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata -> { + client.getPartitionedTopicMetadata(topic, false, true).thenAccept(metadata -> { TopicName topicName = TopicName.get(topic); if (topicName.isPersistent()) { // Either partitioned or non-partitioned, it exists. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 44ef4ac17ee75..4a5557fa869e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -135,9 +135,13 @@ public CompletableFuture getBroker(TopicName topicName) { }); } + /** + * {@inheritDoc} + * @param useFallbackForNonPIP344Brokers HttpLookupService ignores this parameter + */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 2fe457059c1e9..3367ae99cb1a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -61,11 +61,19 @@ public interface LookupService extends AutoCloseable { /** * Returns {@link PartitionedTopicMetadata} for a given topic. * Note: this method will try to create the topic partitioned metadata if it does not exist. - * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean)}}. + * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean, boolean)}}. */ @Deprecated default CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { - return getPartitionedTopicMetadata(topicName, true); + return getPartitionedTopicMetadata(topicName, true, true); + } + + /** + * See the doc {@link #getPartitionedTopicMetadata(TopicName, boolean, boolean)}. + */ + default CompletableFuture getPartitionedTopicMetadata(TopicName topicName, + boolean metadataAutoCreationEnabled) { + return getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled, false); } /** @@ -80,10 +88,15 @@ default CompletableFuture getPartitionedTopicMetadata( * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. * For the result, see case 1. + * @param useFallbackForNonPIP344Brokers

If true, fallback to the prior behavior of the method + * {@link #getPartitionedTopicMetadata(TopicName)} if the broker does not support the PIP-344 feature + * 'supports_get_partitioned_metadata_without_auto_creation'. This parameter only affects the behavior when + * {@param metadataAutoCreationEnabled} is false.

* @version 3.3.0. */ CompletableFuture getPartitionedTopicMetadata(TopicName topicName, - boolean metadataAutoCreationEnabled); + boolean metadataAutoCreationEnabled, + boolean useFallbackForNonPIP344Brokers); /** * Returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e8cbf71e500c9..3f5e501b28130 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -957,7 +957,7 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo CompletableFuture subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicName, true) + client.getPartitionedTopicMetadata(topicName, true, false) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 4585b5328129b..d37c3a10e1607 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -387,7 +387,7 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat private CompletableFuture checkPartitions(String topic, boolean forceNoPartitioned, @Nullable String producerNameForLog) { CompletableFuture checkPartitions = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic, !forceNoPartitioned).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, !forceNoPartitioned, true).thenAccept(metadata -> { if (forceNoPartitioned && metadata.partitions > 0) { String errorMsg = String.format("Can not create the producer[%s] for the topic[%s] that contains %s" + " partitions, but the producer does not support for a partitioned topic.", @@ -560,7 +560,7 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC String topic = conf.getSingleTopic(); - getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -710,7 +710,7 @@ protected CompletableFuture> createSingleTopicReaderAsync( CompletableFuture> readerFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -1131,8 +1131,15 @@ public LookupService createLookup(String url) throws PulsarClientException { } } + /** + * @param useFallbackForNonPIP344Brokers

If true, fallback to the prior behavior of the method + * getPartitionedTopicMetadata if the broker does not support the PIP-344 + * feature 'supports_get_partitioned_metadata_without_auto_creation'. This + * parameter only affects the behavior when + * {@param metadataAutoCreationEnabled} is false.

+ */ public CompletableFuture getPartitionedTopicMetadata( - String topic, boolean metadataAutoCreationEnabled) { + String topic, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { CompletableFuture metadataFuture = new CompletableFuture<>(); @@ -1144,8 +1151,8 @@ public CompletableFuture getPartitionedTopicMetadata( .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) .setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .create(); - getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, - metadataFuture, new ArrayList<>(), metadataAutoCreationEnabled); + getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture, new ArrayList<>(), + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -1157,10 +1164,11 @@ private void getPartitionedTopicMetadata(TopicName topicName, AtomicLong remainingTime, CompletableFuture future, List previousExceptions, - boolean metadataAutoCreationEnabled) { + boolean metadataAutoCreationEnabled, + boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); - CompletableFuture queryFuture = - lookup.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled); + CompletableFuture queryFuture = lookup.getPartitionedTopicMetadata(topicName, + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); queryFuture.thenAccept(future::complete).exceptionally(e -> { remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); @@ -1181,7 +1189,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions, - metadataAutoCreationEnabled); + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); @@ -1189,7 +1197,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, @Override public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { - return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled).thenApply(metadata -> { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List partitions = new ArrayList<>(metadata.partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 45a3ad4f978b1..ce19cbf873eea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -80,7 +80,7 @@ public void start() throws TransactionCoordinatorClientException { public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { return pulsarClient.getPartitionedTopicMetadata( - SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true) + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true, false) .thenCompose(partitionMeta -> { List> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 191124bb7b002..02a4d2ebba8c1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -154,7 +154,7 @@ private MultiTopicsConsumerImpl createMultiTopicsConsumer( int completionDelayMillis = 100; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenAnswer(invocation -> createDelayedCompletedFuture( new PartitionedTopicMetadata(), completionDelayMillis)); MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl( @@ -203,7 +203,7 @@ public void testConsumerCleanupOnSubscribeFailure() { int completionDelayMillis = 10; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenAnswer(invocation -> createExceptionFuture( new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); CompletableFuture> completeFuture = new CompletableFuture<>(); @@ -240,7 +240,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // Simulate non partitioned topics PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(metadata)); CompletableFuture> completeFuture = new CompletableFuture<>(); @@ -252,7 +252,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics, // but not anymore since the topics are not partitioned. - verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean()); + verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 3e897ed89f287..103254a6b90a4 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -108,7 +108,7 @@ public void testConsumerIsClosed() throws Exception { nullable(String.class))) .thenReturn(CompletableFuture.completedFuture( new GetTopicsResult(Collections.emptyList(), null, false, true))); - when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean())) + when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); when(lookup.getBroker(any())) .thenReturn(CompletableFuture.completedFuture(new LookupTopicResult( diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5a7eb582eb5b1..5067ed64079c9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -295,6 +295,7 @@ message CommandConnect { optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy. } +// Please also add a new enum for the class "PulsarClientException.FailedFeatureCheck" when adding a new feature flag. message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false];