From 2993d389f5afd9b0c3398530644ac935a224f530 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 7 Aug 2024 16:29:57 +0800 Subject: [PATCH 01/15] - --- .../broker/namespace/NamespaceService.java | 24 ++++++++++++++++++ .../client/impl/ConsumerBuilderImpl.java | 7 ++++++ .../pulsar/client/impl/PulsarClientImpl.java | 25 +++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index ec4c907234ab6..442823b4e74b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -58,6 +58,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.impl.BinaryProtoLookupService; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1494,6 +1495,29 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { return CompletableFuture.completedFuture(false); + } else if (actEx instanceof PulsarClientException.NotSupportedException){ + /** + * Summary: For compatibility of + * {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}. + * + * Explanation: + * 1. Reason of why getting the error here. + * The feature method above was supported at "3.0.6" and "3.3.1", before that the API + * "getPartitionedTopicMetadata" will trigger a creation for partitioned topic + * metadata automatically even if you just want query it. So the brokers whose version + * is less than "3.0.6" and "3.3.1" do not support the new API. + * 2. The conditions to lead this error occur. + * There are 2 brokers in a cluster, and the version is less than "3.0.1", rolling + * upgrade brokers to "3.0.6". After the first broker restarted, there is one broker + * with version "3.0.6" and another is "3.0.1", and when the internal client tries + * to call "getPartitionedTopicMetadata" to the broker with lower version, it will + * get this error. + * 3. Compatibility + * Rollback to the original behavior before the fix #22838. Without the fix #22838, + * there is an issue that may cause a non-partitioned non-persistent topic and + * a partitioned non-persistent topic with the same name to exist at the same time. + */ + return CompletableFuture.completedFuture(false); } else { log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); return CompletableFuture.failedFuture(ex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 7197cf6be79d5..5a323dbc8af44 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -122,6 +122,13 @@ private CompletableFuture checkDlqAlreadyExists(String topic) { || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { existsFuture.complete(false); + } else if (actEx instanceof PulsarClientException.NotSupportedException) { + existsFuture.completeExceptionally(new PulsarClientException.NotSupportedException("There is a bug that" + + " the Retry/DLQ consumer will still trigger a Retry/DLQ topic with the old rule" + + " ({namespace}/{subscription}-RETRY/DLQ), but the rule was changed to" + + " {namespace}/{topic}-{subscription}-RETRY/DLQ after 2.8.0. Please upgrade the brokers' version" + + " to >=3.0.6 or >=3.3.1; another solution is use HTTP protocol service URL instead of Binary" + + " protocol service URL when building Pulsar Client")); } else { existsFuture.completeExceptionally(ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 4585b5328129b..aff52a01b0155 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -404,6 +404,31 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { checkPartitions.complete(0); + } else if (actEx instanceof PulsarClientException.NotSupportedException) { + /** + * Summary: For compatibility of + * {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}. + * + * Explanation: + * 1. This error will only occur when using Geo-Replication, and one version of the two cluster os + * larger or equals than "3.0.6" and "3.3.1" and another is smaller than "3.0.6" and "3.3.1". + * 2. Reason of why getting the error here. + * The feature method above was supported at "3.0.6" and "3.3.1", before that the API + * "getPartitionedTopicMetadata" will trigger a creation for partitioned topic + * metadata automatically even if you just want query it. So the brokers whose version + * is less than "3.0.6" and "3.3.1" do not support the new API. + * 3. Compatibility + * Skip the check of comparing of topic's partitions, and force connect to the non-partitioned topic, + * it may cause both partitioned topic and non-partitioned topic to exist at the same time. But this + * is still better than the behavior before the fix #22983, without the fix #22838, there is an issue + * that may cause replication stuck and topics being created in confusion, see more details in + * #22838's motivation. + */ + log.error("{} {} Since the target cluster does not support to get topic's partitions without" + + " auto-creation, skip the partitions check. It may cause both partitioned topic and" + + " non-partitioned topic to exist at the same time, please upgrade clusters to the version" + + " that >=3.0.6 or >=3.3.1", topic, producerNameForLog); + checkPartitions.complete(0); } else { checkPartitions.completeExceptionally(ex); } From 9b2e7be77df81c8b321e97629f75246c59d6a0c5 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 7 Aug 2024 21:48:03 +0800 Subject: [PATCH 02/15] fix typo --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index aff52a01b0155..1200460c0ae9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -410,7 +410,7 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo * {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}. * * Explanation: - * 1. This error will only occur when using Geo-Replication, and one version of the two cluster os + * 1. This error will only occur when using Geo-Replication, and one version of the two cluster is * larger or equals than "3.0.6" and "3.3.1" and another is smaller than "3.0.6" and "3.3.1". * 2. Reason of why getting the error here. * The feature method above was supported at "3.0.6" and "3.3.1", before that the API From fccaa2d6ac68e80fecec3ebb0a829d3499edceea Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 8 Aug 2024 00:40:49 +0800 Subject: [PATCH 03/15] checkstyle --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 442823b4e74b8..646169d366b1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -58,7 +58,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.client.impl.BinaryProtoLookupService; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -1498,7 +1497,8 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St } else if (actEx instanceof PulsarClientException.NotSupportedException){ /** * Summary: For compatibility of - * {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}. + * {@link org.apache.pulsar.client.impl.BinaryProtoLookupService + * #getPartitionedTopicMetadata(TopicName, boolean)}. * * Explanation: * 1. Reason of why getting the error here. From 5a72f517d21b4b41f7740536e2361a1ef68ed604 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 8 Aug 2024 10:19:33 +0800 Subject: [PATCH 04/15] address comments --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++++ .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 646169d366b1a..bc08059f9c206 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1517,6 +1517,10 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St * there is an issue that may cause a non-partitioned non-persistent topic and * a partitioned non-persistent topic with the same name to exist at the same time. */ + log.warn("{} The versions of the brokers in the same cluster are different( some are" + + " less than 3.0.6), rollback to the original behavior before the bug fix that" + + " may cause a non-partitioned non-persistent topic and a partitioned" + + " non-persistent topic with the same name to exist at the same time.", topic); return CompletableFuture.completedFuture(false); } else { log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 1200460c0ae9a..9ce02c6106ae9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -424,7 +424,7 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo * that may cause replication stuck and topics being created in confusion, see more details in * #22838's motivation. */ - log.error("{} {} Since the target cluster does not support to get topic's partitions without" + log.warn("{} {} Since the target cluster does not support to get topic's partitions without" + " auto-creation, skip the partitions check. It may cause both partitioned topic and" + " non-partitioned topic to exist at the same time, please upgrade clusters to the version" + " that >=3.0.6 or >=3.3.1", topic, producerNameForLog); From c15a7618f0505cbcf0bdb40313c9085164dc6be3 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 9 Aug 2024 22:22:11 +0800 Subject: [PATCH 05/15] improve the implementation --- pip/a.xtx | 0 .../broker/namespace/NamespaceService.java | 44 ++++------ .../GetPartitionMetadataMultiBrokerTest.java | 81 +++++++++++++++++++ .../admin/GetPartitionMetadataTest.java | 64 +++++++++++++-- .../broker/admin/TopicAutoCreationTest.java | 2 +- .../buffer/TransactionLowWaterMarkTest.java | 6 +- .../client/api/BrokerServiceLookupTest.java | 2 +- .../client/api/PulsarClientException.java | 25 ++++++ .../client/impl/BinaryProtoLookupService.java | 28 +++++-- .../client/impl/ConsumerBuilderImpl.java | 9 +-- .../pulsar/client/impl/HttpLookupService.java | 6 +- .../pulsar/client/impl/LookupService.java | 11 ++- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 53 ++++-------- .../TransactionCoordinatorClientImpl.java | 2 +- .../impl/MultiTopicsConsumerImplTest.java | 8 +- .../client/impl/PulsarClientImplTest.java | 2 +- pulsar-common/src/main/proto/PulsarApi.proto | 1 + 18 files changed, 242 insertions(+), 104 deletions(-) create mode 100644 pip/a.xtx diff --git a/pip/a.xtx b/pip/a.xtx new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index bc08059f9c206..214d7d8e233bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -24,6 +24,8 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck + .SupportsGetPartitionedMetadataWithoutAutoCreation; import com.google.common.hash.Hashing; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -1486,7 +1488,7 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St brokerUrl = lookupData.getBrokerUrl(); } return pulsarClient.getLookup(brokerUrl) - .getPartitionedTopicMetadata(topicName, false) + .getPartitionedTopicMetadata(topicName, false, false) .thenApply(metadata -> true) .exceptionallyCompose(ex -> { Throwable actEx = FutureUtil.unwrapCompletionException(ex); @@ -1494,34 +1496,18 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { return CompletableFuture.completedFuture(false); - } else if (actEx instanceof PulsarClientException.NotSupportedException){ - /** - * Summary: For compatibility of - * {@link org.apache.pulsar.client.impl.BinaryProtoLookupService - * #getPartitionedTopicMetadata(TopicName, boolean)}. - * - * Explanation: - * 1. Reason of why getting the error here. - * The feature method above was supported at "3.0.6" and "3.3.1", before that the API - * "getPartitionedTopicMetadata" will trigger a creation for partitioned topic - * metadata automatically even if you just want query it. So the brokers whose version - * is less than "3.0.6" and "3.3.1" do not support the new API. - * 2. The conditions to lead this error occur. - * There are 2 brokers in a cluster, and the version is less than "3.0.1", rolling - * upgrade brokers to "3.0.6". After the first broker restarted, there is one broker - * with version "3.0.6" and another is "3.0.1", and when the internal client tries - * to call "getPartitionedTopicMetadata" to the broker with lower version, it will - * get this error. - * 3. Compatibility - * Rollback to the original behavior before the fix #22838. Without the fix #22838, - * there is an issue that may cause a non-partitioned non-persistent topic and - * a partitioned non-persistent topic with the same name to exist at the same time. - */ - log.warn("{} The versions of the brokers in the same cluster are different( some are" - + " less than 3.0.6), rollback to the original behavior before the bug fix that" - + " may cause a non-partitioned non-persistent topic and a partitioned" - + " non-persistent topic with the same name to exist at the same time.", topic); - return CompletableFuture.completedFuture(false); + } else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){ + if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) { + // Since the feature PIP-344 was not supported, just rollback the behavior to the + // original as before the fix https://github.com/apache/pulsar/pull/22838. + log.info("{} Checking if a non-persistent non-partitioned topic exists was" + + " roll-backed to the original before #22838, because a broker does not" + + " support a new API. see more detail #23136", topic); + return CompletableFuture.completedFuture(false); + } else { + log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); + return CompletableFuture.failedFuture(ex); + } } else { log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); return CompletableFuture.failedFuture(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index 28cf91ee165e2..d4a9ca516d1f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -18,20 +18,32 @@ */ package org.apache.pulsar.broker.admin; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Field; import java.net.URL; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker-admin") @@ -219,4 +231,73 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config super.testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(configAllowAutoTopicCreation, paramMetadataAutoCreationEnabled, isUsingHttpLookup); } + + @DataProvider(name = "autoCreationParamsAllForNonPersistentTopic") + public Object[][] autoCreationParamsAllForNonPersistentTopic(){ + return new Object[][]{ + // configAllowAutoTopicCreation, paramCreateIfAutoCreationEnabled, isUsingHttpLookup. + {true, true, true}, + {true, true, false}, + {true, false, true}, + {true, false, false}, + {false, true, true}, + {false, true, false}, + {false, false, true}, + {false, false, false} + }; + } + + /** + * {@inheritDoc} + */ + @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic") + public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation, + boolean paramMetadataAutoCreationEnabled, + boolean isUsingHttpLookup) throws Exception { + modifyTopicAutoCreation(configAllowAutoTopicCreation, TopicType.PARTITIONED, 3); + + // Initialize the connections of internal Pulsar Client. + PulsarClientImpl client1 = (PulsarClientImpl) pulsar1.getClient(); + PulsarClientImpl client2 = (PulsarClientImpl) pulsar2.getClient(); + client1.getLookup(pulsar2.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + client2.getLookup(pulsar1.getBrokerServiceUrl()).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + + // Inject a not support flag into the connections initialized. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (PulsarClientImpl client : Arrays.asList(client1, client2)) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, false); + } + } + // Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed + // to "getPartitionsForTopic(topic, true)". + int lookupPermitsBefore = getLookupRequestPermits(); + + // Verify: we will not get an un-support error. + PulsarClientImpl[] clientArray = getClientsToTest(isUsingHttpLookup); + for (PulsarClientImpl client : clientArray) { + final String topicNameStr = BrokerTestUtil.newUniqueName("non-persistent://" + DEFAULT_NS + "/tp"); + try { + PartitionedTopicMetadata topicMetadata = client + .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) + .join(); + log.info("Get topic metadata: {}", topicMetadata.partitions); + } catch (Exception ex) { + Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex); + assertTrue(unwrapEx instanceof PulsarClientException.TopicDoesNotExistException + || unwrapEx instanceof PulsarClientException.NotFoundException); + assertFalse(ex.getMessage().contains("getting partitions without auto-creation is not supported from" + + " the broker")); + } + } + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index bf99b172829a7..4e6fdc0ba6784 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -23,10 +23,12 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.lang.reflect.Field; import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; @@ -34,6 +36,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -225,6 +229,50 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t } } + @Test(dataProvider = "topicDomains") + public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception { + modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3); + // Initialize connections. + String pulsarUrl = pulsar1.getBrokerServiceUrl(); + PulsarClientImpl[] clients = getClientsToTest(false); + for (PulsarClientImpl client : clients) { + client.getLookup(pulsarUrl).getBroker(TopicName.get(DEFAULT_NS + "/tp1")); + } + // Inject a not support flag into the connections initialized. + Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); + field.setAccessible(true); + for (PulsarClientImpl client : clients) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, false); + } + } + + // Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed to + // "getPartitionsForTopic(topic)". + int lookupPermitsBefore = getLookupRequestPermits(); + for (PulsarClientImpl client : clients) { + // Verify: the behavior of topic creation. + final String tp = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); + client.getPartitionedTopicMetadata(tp, false, true).join(); + Optional metadata1 = pulsar1.getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources() + .getPartitionedTopicMetadataAsync(TopicName.get(tp), true).join(); + assertTrue(metadata1.isPresent()); + assertEquals(metadata1.get().partitions, 3); + + // Verify: lookup semaphore has been releases. + Awaitility.await().untilAsserted(() -> { + assertEquals(getLookupRequestPermits(), lookupPermitsBefore); + }); + + // Cleanup. + admin1.topics().deletePartitionedTopic(tp, false); + } + } + @DataProvider(name = "autoCreationParamsAll") public Object[][] autoCreationParamsAll(){ return new Object[][]{ @@ -265,7 +313,7 @@ public void testGetMetadataIfNonPartitionedTopicExists(boolean configAllowAutoTo for (PulsarClientImpl client : clientArray) { // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response = - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join(); assertEquals(response.partitions, 0); List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); assertFalse(partitionedTopics.contains(topicNameStr)); @@ -298,7 +346,7 @@ public void testGetMetadataIfPartitionedTopicExists(boolean configAllowAutoTopic for (PulsarClientImpl client : clientArray) { // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response = - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled).join(); + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false).join(); assertEquals(response.partitions, 3); verifyNonPartitionedTopicNeverCreated(topicNameStr); @@ -332,7 +380,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai // Case-1: normal topic. final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); // Verify: the result of get partitioned topic metadata. - PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join(); assertEquals(response.partitions, 3); // Verify: the behavior of topic creation. List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); @@ -347,7 +395,7 @@ public void testAutoCreatePartitionedTopic(boolean isUsingHttpLookup, TopicDomai topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response2 = - client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join(); assertEquals(response2.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics2 = @@ -380,7 +428,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo // Case 1: normal topic. final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.value() + "://" + DEFAULT_NS + "/tp"); // Verify: the result of get partitioned topic metadata. - PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true).join(); + PartitionedTopicMetadata response = client.getPartitionedTopicMetadata(topicNameStr, true, false).join(); assertEquals(response.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics = admin1.topics().getPartitionedTopicList("public/default"); @@ -392,7 +440,7 @@ public void testAutoCreateNonPartitionedTopic(boolean isUsingHttpLookup, TopicDo topicDomain.value() + "://" + DEFAULT_NS + "/tp") + "-partition-1"; // Verify: the result of get partitioned topic metadata. PartitionedTopicMetadata response2 = - client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true).join(); + client.getPartitionedTopicMetadata(topicNameStrWithSuffix, true, false).join(); assertEquals(response2.partitions, 0); // Verify: the behavior of topic creation. List partitionedTopics2 = @@ -443,7 +491,7 @@ public void testGetMetadataIfNotAllowedCreate(boolean configAllowAutoTopicCreati final TopicName topicName = TopicName.get(topicNameStr); // Verify: the result of get partitioned topic metadata. try { - client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) + client.getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) .join(); fail("Expect a not found exception"); } catch (Exception e) { @@ -496,7 +544,7 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config // Verify: the result of get partitioned topic metadata. try { PartitionedTopicMetadata topicMetadata = client - .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled) + .getPartitionedTopicMetadata(topicNameStr, paramMetadataAutoCreationEnabled, false) .join(); log.info("Get topic metadata: {}", topicMetadata.partitions); fail("Expected a not found ex"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 55601ad4c6b1d..354c26078af88 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -134,7 +134,7 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() // we want to skip the "lookup" phase, because it is blocked by the HTTP API LookupService mockLookup = mock(LookupService.class); ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); - when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer( + when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer( i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(ignored -> { InetSocketAddress brokerAddress = diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index b17565cfc0dfa..f7e51245136c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -148,7 +148,8 @@ public void testTransactionBufferLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false) + .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() @@ -253,7 +254,8 @@ public void testPendingAckLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false).get(); + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false) + .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { lowWaterMarkTxn = pulsarClient.newTransaction() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index 157df1185307a..b15b02255e0c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -933,7 +933,7 @@ public void testMergeGetPartitionedMetadataRequests() throws Exception { // Verify the request is works after merge the requests. List> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { - futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false)); + futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false, false)); } for (CompletableFuture future : futures) { assertEquals(future.join().partitions, topicPartitions); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index c460fee11d0e6..513ee4d7e4edf 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import lombok.Getter; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -741,6 +742,30 @@ public NotSupportedException(String msg) { } } + /** + * Not supported exception thrown by Pulsar client. + */ + public static class FeatureNotSupportedException extends NotSupportedException { + + @Getter + private final FailedFeatureCheck failedFeatureCheck; + + public FeatureNotSupportedException(String msg, FailedFeatureCheck failedFeatureCheck) { + super(msg); + this.failedFeatureCheck = failedFeatureCheck; + } + } + + /** + * "supports_auth_refresh" was introduced at "2.6" and is no longer supported, so skip this enum. + * "supports_broker_entry_metadata" was introduced at "2.8" and is no longer supported, so skip this enum. + * "supports_partial_producer" was introduced at "2.10" and is no longer supported, so skip this enum. + * "supports_topic_watchers" was introduced at "2.11" and is no longer supported, so skip this enum. + */ + public enum FailedFeatureCheck { + SupportsGetPartitionedMetadataWithoutAutoCreation; + } + /** * Not allowed exception thrown by Pulsar client. */ diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index bf015c564b9cc..05e110ffd4374 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.impl; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck + .SupportsGetPartitionedMetadataWithoutAutoCreation; import static java.lang.String.format; import io.netty.buffer.ByteBuf; import io.opentelemetry.api.common.Attributes; @@ -146,12 +148,13 @@ public CompletableFuture getBroker(TopicName topicName) { */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { final MutableObject newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { CompletableFuture newFuture = getPartitionedTopicMetadata( - serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled); + serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled, + acceptFallbackIfNotSupport); newFutureCreated.setValue(newFuture); return newFuture; }); @@ -248,21 +251,30 @@ private CompletableFuture findBroker(InetSocketAddress socket } private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress, - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { - partitionFuture.completeExceptionally(new PulsarClientException.NotSupportedException("The feature of" - + " getting partitions without auto-creation is not supported from the broker," - + " please upgrade the broker to the latest version.")); - return; + if (acceptFallbackIfNotSupport) { + log.info("{} Roll-back getPartitionedTopicMetadata(topic, false) to" + + " getPartitionedTopicMetadata(topic) since broker does not support.", topicName); + finalAutoCreationEnabled = true; + } else { + partitionFuture.completeExceptionally( + new PulsarClientException.FeatureNotSupportedException("The feature of" + + " getting partitions without auto-creation is not supported from the broker," + + " please upgrade the broker to the latest version.", + SupportsGetPartitionedMetadataWithoutAutoCreation)); + return; + } } long requestId = client.newRequestId(); ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId, - metadataAutoCreationEnabled); + finalAutoCreationEnabled); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { histoGetTopicMetadata.recordFailure(System.nanoTime() - startTime); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 5a323dbc8af44..351025d426a39 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -107,7 +107,7 @@ public Consumer subscribe() throws PulsarClientException { private CompletableFuture checkDlqAlreadyExists(String topic) { CompletableFuture existsFuture = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topic, false).thenAccept(metadata -> { + client.getPartitionedTopicMetadata(topic, false, true).thenAccept(metadata -> { TopicName topicName = TopicName.get(topic); if (topicName.isPersistent()) { // Either partitioned or non-partitioned, it exists. @@ -122,13 +122,6 @@ private CompletableFuture checkDlqAlreadyExists(String topic) { || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { existsFuture.complete(false); - } else if (actEx instanceof PulsarClientException.NotSupportedException) { - existsFuture.completeExceptionally(new PulsarClientException.NotSupportedException("There is a bug that" - + " the Retry/DLQ consumer will still trigger a Retry/DLQ topic with the old rule" - + " ({namespace}/{subscription}-RETRY/DLQ), but the rule was changed to" - + " {namespace}/{topic}-{subscription}-RETRY/DLQ after 2.8.0. Please upgrade the brokers' version" - + " to >=3.0.6 or >=3.3.1; another solution is use HTTP protocol service URL instead of Binary" - + " protocol service URL when building Pulsar Client")); } else { existsFuture.completeExceptionally(ex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 44ef4ac17ee75..e6266d5ce056d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -135,9 +135,13 @@ public CompletableFuture getBroker(TopicName topicName) { }); } + /** + * {@inheritDoc} + * @param acceptFallbackIfNotSupport HttpLookupService supports every request, so this param will be ignored. + */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { long startTime = System.nanoTime(); String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 2fe457059c1e9..13335032b6b58 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -61,11 +61,11 @@ public interface LookupService extends AutoCloseable { /** * Returns {@link PartitionedTopicMetadata} for a given topic. * Note: this method will try to create the topic partitioned metadata if it does not exist. - * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean)}}. + * @deprecated Please call {{@link #getPartitionedTopicMetadata(TopicName, boolean, boolean)}}. */ @Deprecated default CompletableFuture getPartitionedTopicMetadata(TopicName topicName) { - return getPartitionedTopicMetadata(topicName, true); + return getPartitionedTopicMetadata(topicName, true, true); } /** @@ -80,10 +80,15 @@ default CompletableFuture getPartitionedTopicMetadata( * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. * For the result, see case 1. + * @param acceptFallbackIfNotSupport roll-back to the original method + * {@link #getPartitionedTopicMetadata(TopicName)} if brokers do not support + * "getPartitionedTopicMetadata(topic, false)". This param only affects when the + * {@param metadataAutoCreationEnabled} is "false". * @version 3.3.0. */ CompletableFuture getPartitionedTopicMetadata(TopicName topicName, - boolean metadataAutoCreationEnabled); + boolean metadataAutoCreationEnabled, + boolean acceptFallbackIfNotSupport); /** * Returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index e8cbf71e500c9..3f5e501b28130 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -957,7 +957,7 @@ public CompletableFuture subscribeAsync(String topicName, boolean createTo CompletableFuture subscribeResult = new CompletableFuture<>(); - client.getPartitionedTopicMetadata(topicName, true) + client.getPartitionedTopicMetadata(topicName, true, false) .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions, createTopicIfDoesNotExist)) .exceptionally(ex1 -> { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 9ce02c6106ae9..36a933c105dc2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -387,7 +387,7 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat private CompletableFuture checkPartitions(String topic, boolean forceNoPartitioned, @Nullable String producerNameForLog) { CompletableFuture checkPartitions = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic, !forceNoPartitioned).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, !forceNoPartitioned, true).thenAccept(metadata -> { if (forceNoPartitioned && metadata.partitions > 0) { String errorMsg = String.format("Can not create the producer[%s] for the topic[%s] that contains %s" + " partitions, but the producer does not support for a partitioned topic.", @@ -404,31 +404,6 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo || actEx instanceof PulsarClientException.TopicDoesNotExistException || actEx instanceof PulsarAdminException.NotFoundException) { checkPartitions.complete(0); - } else if (actEx instanceof PulsarClientException.NotSupportedException) { - /** - * Summary: For compatibility of - * {@link BinaryProtoLookupService#getPartitionedTopicMetadata(TopicName, boolean)}. - * - * Explanation: - * 1. This error will only occur when using Geo-Replication, and one version of the two cluster is - * larger or equals than "3.0.6" and "3.3.1" and another is smaller than "3.0.6" and "3.3.1". - * 2. Reason of why getting the error here. - * The feature method above was supported at "3.0.6" and "3.3.1", before that the API - * "getPartitionedTopicMetadata" will trigger a creation for partitioned topic - * metadata automatically even if you just want query it. So the brokers whose version - * is less than "3.0.6" and "3.3.1" do not support the new API. - * 3. Compatibility - * Skip the check of comparing of topic's partitions, and force connect to the non-partitioned topic, - * it may cause both partitioned topic and non-partitioned topic to exist at the same time. But this - * is still better than the behavior before the fix #22983, without the fix #22838, there is an issue - * that may cause replication stuck and topics being created in confusion, see more details in - * #22838's motivation. - */ - log.warn("{} {} Since the target cluster does not support to get topic's partitions without" - + " auto-creation, skip the partitions check. It may cause both partitioned topic and" - + " non-partitioned topic to exist at the same time, please upgrade clusters to the version" - + " that >=3.0.6 or >=3.3.1", topic, producerNameForLog); - checkPartitions.complete(0); } else { checkPartitions.completeExceptionally(ex); } @@ -585,7 +560,7 @@ private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerC String topic = conf.getSingleTopic(); - getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -735,7 +710,7 @@ protected CompletableFuture> createSingleTopicReaderAsync( CompletableFuture> readerFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { + getPartitionedTopicMetadata(topic, true, false).thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } @@ -1156,8 +1131,13 @@ public LookupService createLookup(String url) throws PulsarClientException { } } + /** + * @param acceptFallbackIfNotSupport roll-back to the original method {@link #getPartitionsForTopic(String)} if + * brokers do not support "getPartitionsForTopic(topic, false)". This param only affects when the + * {@param metadataAutoCreationEnabled} is "false". + */ public CompletableFuture getPartitionedTopicMetadata( - String topic, boolean metadataAutoCreationEnabled) { + String topic, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { CompletableFuture metadataFuture = new CompletableFuture<>(); @@ -1169,8 +1149,8 @@ public CompletableFuture getPartitionedTopicMetadata( .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) .setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .create(); - getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, - metadataFuture, new ArrayList<>(), metadataAutoCreationEnabled); + getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture, new ArrayList<>(), + metadataAutoCreationEnabled, acceptFallbackIfNotSupport); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -1182,10 +1162,11 @@ private void getPartitionedTopicMetadata(TopicName topicName, AtomicLong remainingTime, CompletableFuture future, List previousExceptions, - boolean metadataAutoCreationEnabled) { + boolean metadataAutoCreationEnabled, + boolean acceptFallbackIfNotSupport) { long startTime = System.nanoTime(); - CompletableFuture queryFuture = - lookup.getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled); + CompletableFuture queryFuture = lookup.getPartitionedTopicMetadata(topicName, + metadataAutoCreationEnabled, acceptFallbackIfNotSupport); queryFuture.thenAccept(future::complete).exceptionally(e -> { remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); @@ -1206,7 +1187,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions, - metadataAutoCreationEnabled); + metadataAutoCreationEnabled, acceptFallbackIfNotSupport); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); @@ -1214,7 +1195,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, @Override public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { - return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled).thenApply(metadata -> { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List partitions = new ArrayList<>(metadata.partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 45a3ad4f978b1..ce19cbf873eea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -80,7 +80,7 @@ public void start() throws TransactionCoordinatorClientException { public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { return pulsarClient.getPartitionedTopicMetadata( - SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true) + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true, false) .thenCompose(partitionMeta -> { List> connectFutureList = new ArrayList<>(); if (LOG.isDebugEnabled()) { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java index 191124bb7b002..02a4d2ebba8c1 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java @@ -154,7 +154,7 @@ private MultiTopicsConsumerImpl createMultiTopicsConsumer( int completionDelayMillis = 100; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenAnswer(invocation -> createDelayedCompletedFuture( new PartitionedTopicMetadata(), completionDelayMillis)); MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl( @@ -203,7 +203,7 @@ public void testConsumerCleanupOnSubscribeFailure() { int completionDelayMillis = 10; Schema schema = Schema.BYTES; PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenAnswer(invocation -> createExceptionFuture( new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis)); CompletableFuture> completeFuture = new CompletableFuture<>(); @@ -240,7 +240,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // Simulate non partitioned topics PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(0); - when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean())) + when(clientMock.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(metadata)); CompletableFuture> completeFuture = new CompletableFuture<>(); @@ -252,7 +252,7 @@ public void testDontCheckForPartitionsUpdatesOnNonPartitionedTopics() throws Exc // getPartitionedTopicMetadata should have been called only the first time, for each of the 3 topics, // but not anymore since the topics are not partitioned. - verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean()); + verify(clientMock, times(3)).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean()); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 3e897ed89f287..103254a6b90a4 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -108,7 +108,7 @@ public void testConsumerIsClosed() throws Exception { nullable(String.class))) .thenReturn(CompletableFuture.completedFuture( new GetTopicsResult(Collections.emptyList(), null, false, true))); - when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean())) + when(lookup.getPartitionedTopicMetadata(any(TopicName.class), anyBoolean(), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); when(lookup.getBroker(any())) .thenReturn(CompletableFuture.completedFuture(new LookupTopicResult( diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 5a7eb582eb5b1..5067ed64079c9 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -295,6 +295,7 @@ message CommandConnect { optional string proxy_version = 11; // Version of the proxy. Should only be forwarded by a proxy. } +// Please also add a new enum for the class "PulsarClientException.FailedFeatureCheck" when adding a new feature flag. message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; From eb2b78e3237ea379c62e9e2a22159c8a1b908db2 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 9 Aug 2024 22:26:39 +0800 Subject: [PATCH 06/15] remove unnecessary file --- pip/a.xtx | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 pip/a.xtx diff --git a/pip/a.xtx b/pip/a.xtx deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 7ca3c61280b90c4895d61cc98671df14ba5f704f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 9 Aug 2024 22:27:35 +0800 Subject: [PATCH 07/15] remove unnecessary codes --- .../broker/admin/GetPartitionMetadataMultiBrokerTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index d4a9ca516d1f8..c6cad2d3c928c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -247,9 +247,6 @@ public Object[][] autoCreationParamsAllForNonPersistentTopic(){ }; } - /** - * {@inheritDoc} - */ @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic") public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, From 1ccc7fef40c9031d28a24a894f80d9e148b55949 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 10 Aug 2024 01:08:31 +0800 Subject: [PATCH 08/15] rollback -> fallback --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 4 ++-- .../apache/pulsar/client/impl/BinaryProtoLookupService.java | 2 +- .../java/org/apache/pulsar/client/impl/LookupService.java | 2 +- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 214d7d8e233bc..3c590dc0d4c0a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1501,8 +1501,8 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St // Since the feature PIP-344 was not supported, just rollback the behavior to the // original as before the fix https://github.com/apache/pulsar/pull/22838. log.info("{} Checking if a non-persistent non-partitioned topic exists was" - + " roll-backed to the original before #22838, because a broker does not" - + " support a new API. see more detail #23136", topic); + + " fall-backed to the previous behavior before #22838, because a broker" + + " does not support a new API. see more detail #23136", topic); return CompletableFuture.completedFuture(false); } else { log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 05e110ffd4374..9212dda6f241e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -260,7 +260,7 @@ private CompletableFuture getPartitionedTopicMetadata( boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { if (acceptFallbackIfNotSupport) { - log.info("{} Roll-back getPartitionedTopicMetadata(topic, false) to" + log.info("{} Fall-back getPartitionedTopicMetadata(topic, false) to" + " getPartitionedTopicMetadata(topic) since broker does not support.", topicName); finalAutoCreationEnabled = true; } else { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 13335032b6b58..4dc5cfbea452a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -80,7 +80,7 @@ default CompletableFuture getPartitionedTopicMetadata( * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. * For the result, see case 1. - * @param acceptFallbackIfNotSupport roll-back to the original method + * @param acceptFallbackIfNotSupport fall-back to the original method * {@link #getPartitionedTopicMetadata(TopicName)} if brokers do not support * "getPartitionedTopicMetadata(topic, false)". This param only affects when the * {@param metadataAutoCreationEnabled} is "false". diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 36a933c105dc2..b75986b713995 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1132,7 +1132,7 @@ public LookupService createLookup(String url) throws PulsarClientException { } /** - * @param acceptFallbackIfNotSupport roll-back to the original method {@link #getPartitionsForTopic(String)} if + * @param acceptFallbackIfNotSupport fall-back to the original method {@link #getPartitionsForTopic(String)} if * brokers do not support "getPartitionsForTopic(topic, false)". This param only affects when the * {@param metadataAutoCreationEnabled} is "false". */ From 9078ebc950520d182fb80f47b929b3430f5d407a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 9 Aug 2024 20:21:24 +0300 Subject: [PATCH 09/15] Improve clarity of some comments, parameter names and javadocs --- .../broker/namespace/NamespaceService.java | 16 ++++++++------- .../client/impl/BinaryProtoLookupService.java | 20 ++++++++++--------- .../pulsar/client/impl/HttpLookupService.java | 4 ++-- .../pulsar/client/impl/LookupService.java | 10 +++++----- .../pulsar/client/impl/PulsarClientImpl.java | 18 +++++++++-------- 5 files changed, 37 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 3c590dc0d4c0a..58a58c946b6b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1498,18 +1498,20 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St return CompletableFuture.completedFuture(false); } else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){ if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) { - // Since the feature PIP-344 was not supported, just rollback the behavior to the - // original as before the fix https://github.com/apache/pulsar/pull/22838. - log.info("{} Checking if a non-persistent non-partitioned topic exists was" - + " fall-backed to the previous behavior before #22838, because a broker" - + " does not support a new API. see more detail #23136", topic); + // Since the feature PIP-344 isn't supported, restore the behavior to previous + // before before https://github.com/apache/pulsar/pull/22838 changes. + log.info("{} Checking the existence of a non-persistent non-partitioned topic " + + "was performed using the behavior prior to PIP-344 changes, " + + "because the broker does not support the PIP-344 feature " + + "'supports_get_partitioned_metadata_without_auto_creation'.", + topic); return CompletableFuture.completedFuture(false); } else { - log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); + log.error("{} Failed to get partition metadata", topic, ex); return CompletableFuture.failedFuture(ex); } } else { - log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); + log.error("{} Failed to get partition metadata", topic, ex); return CompletableFuture.failedFuture(ex); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9212dda6f241e..a22d4d55841c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -148,13 +148,13 @@ public CompletableFuture getBroker(TopicName topicName) { */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { final MutableObject newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { CompletableFuture newFuture = getPartitionedTopicMetadata( serviceNameResolver.resolveHost(), topicName, metadataAutoCreationEnabled, - acceptFallbackIfNotSupport); + useFallbackForNonPIP344Brokers); newFutureCreated.setValue(newFuture); return newFuture; }); @@ -251,7 +251,7 @@ private CompletableFuture findBroker(InetSocketAddress socket } private CompletableFuture getPartitionedTopicMetadata(InetSocketAddress socketAddress, - TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); @@ -259,15 +259,17 @@ private CompletableFuture getPartitionedTopicMetadata( client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { - if (acceptFallbackIfNotSupport) { - log.info("{} Fall-back getPartitionedTopicMetadata(topic, false) to" - + " getPartitionedTopicMetadata(topic) since broker does not support.", topicName); + if (useFallbackForNonPIP344Brokers) { + log.info("[{}] Using original behavior of getPartitionedTopicMetadata(topic) in " + + "getPartitionedTopicMetadata(topic, false) " + + "since the target broker does not support PIP-344 and fallback is enabled.", topicName); finalAutoCreationEnabled = true; } else { partitionFuture.completeExceptionally( - new PulsarClientException.FeatureNotSupportedException("The feature of" - + " getting partitions without auto-creation is not supported from the broker," - + " please upgrade the broker to the latest version.", + new PulsarClientException.FeatureNotSupportedException("The feature of " + + "getting partitions without auto-creation is not supported by the broker. " + + "Please upgrade the broker to version that supports PIP-344 to resolve this " + + "issue.", SupportsGetPartitionedMetadataWithoutAutoCreation)); return; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index e6266d5ce056d..4a5557fa869e4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -137,11 +137,11 @@ public CompletableFuture getBroker(TopicName topicName) { /** * {@inheritDoc} - * @param acceptFallbackIfNotSupport HttpLookupService supports every request, so this param will be ignored. + * @param useFallbackForNonPIP344Brokers HttpLookupService ignores this parameter */ @Override public CompletableFuture getPartitionedTopicMetadata( - TopicName topicName, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { + TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 4dc5cfbea452a..13ff54c273a57 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -80,15 +80,15 @@ default CompletableFuture getPartitionedTopicMetadata( * 3.When {@param metadataAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using * the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. * For the result, see case 1. - * @param acceptFallbackIfNotSupport fall-back to the original method - * {@link #getPartitionedTopicMetadata(TopicName)} if brokers do not support - * "getPartitionedTopicMetadata(topic, false)". This param only affects when the - * {@param metadataAutoCreationEnabled} is "false". + * @param useFallbackForNonPIP344Brokers

If true, fallback to the prior behavior of the method + * {@link #getPartitionedTopicMetadata(TopicName)} if the broker does not support the PIP-344 feature + * 'supports_get_partitioned_metadata_without_auto_creation'. This parameter only affects the behavior when + * {@param metadataAutoCreationEnabled} is false.

* @version 3.3.0. */ CompletableFuture getPartitionedTopicMetadata(TopicName topicName, boolean metadataAutoCreationEnabled, - boolean acceptFallbackIfNotSupport); + boolean useFallbackForNonPIP344Brokers); /** * Returns current SchemaInfo {@link SchemaInfo} for a given topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index b75986b713995..d37c3a10e1607 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1132,12 +1132,14 @@ public LookupService createLookup(String url) throws PulsarClientException { } /** - * @param acceptFallbackIfNotSupport fall-back to the original method {@link #getPartitionsForTopic(String)} if - * brokers do not support "getPartitionsForTopic(topic, false)". This param only affects when the - * {@param metadataAutoCreationEnabled} is "false". + * @param useFallbackForNonPIP344Brokers

If true, fallback to the prior behavior of the method + * getPartitionedTopicMetadata if the broker does not support the PIP-344 + * feature 'supports_get_partitioned_metadata_without_auto_creation'. This + * parameter only affects the behavior when + * {@param metadataAutoCreationEnabled} is false.

*/ public CompletableFuture getPartitionedTopicMetadata( - String topic, boolean metadataAutoCreationEnabled, boolean acceptFallbackIfNotSupport) { + String topic, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { CompletableFuture metadataFuture = new CompletableFuture<>(); @@ -1150,7 +1152,7 @@ public CompletableFuture getPartitionedTopicMetadata( .setMax(conf.getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .create(); getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture, new ArrayList<>(), - metadataAutoCreationEnabled, acceptFallbackIfNotSupport); + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } @@ -1163,10 +1165,10 @@ private void getPartitionedTopicMetadata(TopicName topicName, CompletableFuture future, List previousExceptions, boolean metadataAutoCreationEnabled, - boolean acceptFallbackIfNotSupport) { + boolean useFallbackForNonPIP344Brokers) { long startTime = System.nanoTime(); CompletableFuture queryFuture = lookup.getPartitionedTopicMetadata(topicName, - metadataAutoCreationEnabled, acceptFallbackIfNotSupport); + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); queryFuture.thenAccept(future::complete).exceptionally(e -> { remainingTime.addAndGet(-1 * TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); long nextDelay = Math.min(backoff.next(), remainingTime.get()); @@ -1187,7 +1189,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, + "Will try again in {} ms", topicName, nextDelay); remainingTime.addAndGet(-nextDelay); getPartitionedTopicMetadata(topicName, backoff, remainingTime, future, previousExceptions, - metadataAutoCreationEnabled, acceptFallbackIfNotSupport); + metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); }, nextDelay, TimeUnit.MILLISECONDS); return null; }); From b3fe373d83f65967ec82273078273f6445a03ed8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 9 Aug 2024 20:32:53 +0300 Subject: [PATCH 10/15] Fix checkstyle --- .../apache/pulsar/client/impl/BinaryProtoLookupService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index a22d4d55841c1..6ee6fafde1c25 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -18,9 +18,8 @@ */ package org.apache.pulsar.client.impl; -import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck - .SupportsGetPartitionedMetadataWithoutAutoCreation; import static java.lang.String.format; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; From 48623cee5cd4ef305388d85974bdd654ad071e4b Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 9 Aug 2024 20:34:50 +0300 Subject: [PATCH 11/15] Fix checkstyle --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 58a58c946b6b5..87dd0da53ef60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -23,9 +23,8 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; -import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck - .SupportsGetPartitionedMetadataWithoutAutoCreation; import com.google.common.hash.Hashing; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; From 5b90c46403608a0f9cc5b8d638ae03265e7b00cc Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 9 Aug 2024 20:42:29 +0300 Subject: [PATCH 12/15] Fix typo in comment --- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 87dd0da53ef60..c2b1880d4f28f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1498,7 +1498,7 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St } else if (actEx instanceof PulsarClientException.FeatureNotSupportedException fe){ if (fe.getFailedFeatureCheck() == SupportsGetPartitionedMetadataWithoutAutoCreation) { // Since the feature PIP-344 isn't supported, restore the behavior to previous - // before before https://github.com/apache/pulsar/pull/22838 changes. + // behavior before https://github.com/apache/pulsar/pull/22838 changes. log.info("{} Checking the existence of a non-persistent non-partitioned topic " + "was performed using the behavior prior to PIP-344 changes, " + "because the broker does not support the PIP-344 feature " From 6c422d32c7344cb72b7607855153f7a342fda9fa Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 9 Aug 2024 20:45:10 +0300 Subject: [PATCH 13/15] Update roll-back -> fallback in test comments --- .../broker/admin/GetPartitionMetadataMultiBrokerTest.java | 4 ++-- .../apache/pulsar/broker/admin/GetPartitionMetadataTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index c6cad2d3c928c..9116fa9f83506 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -270,8 +270,8 @@ public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean confi field.set(clientCnx, false); } } - // Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed - // to "getPartitionsForTopic(topic, true)". + // Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback + // to "getPartitionsForTopic(topic, true)" behavior. int lookupPermitsBefore = getLookupRequestPermits(); // Verify: we will not get an un-support error. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 4e6fdc0ba6784..526a417c05e79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -250,8 +250,8 @@ public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) t } } - // Verify: the method "getPartitionsForTopic(topic, false, true)" will be roll-backed to - // "getPartitionsForTopic(topic)". + // Verify: the method "getPartitionsForTopic(topic, false, true)" will fallback to + // "getPartitionsForTopic(topic)" behavior. int lookupPermitsBefore = getLookupRequestPermits(); for (PulsarClientImpl client : clients) { // Verify: the behavior of topic creation. From 80b9ea14c205381b3278d5cf307586409d26f58d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 10 Aug 2024 14:09:36 +0800 Subject: [PATCH 14/15] address comments and fix tests --- .../pulsar/broker/namespace/NamespaceService.java | 2 +- .../admin/GetPartitionMetadataMultiBrokerTest.java | 10 ++++++++++ .../pulsar/broker/admin/GetPartitionMetadataTest.java | 10 ++++++++++ .../pulsar/broker/admin/TopicAutoCreationTest.java | 2 ++ .../buffer/TransactionLowWaterMarkTest.java | 4 ++-- .../pulsar/client/api/BrokerServiceLookupTest.java | 2 +- .../org/apache/pulsar/client/impl/ClientCnxTest.java | 2 +- .../org/apache/pulsar/client/impl/LookupService.java | 8 ++++++++ 8 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index c2b1880d4f28f..52e03fb8b9aec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1487,7 +1487,7 @@ public CompletableFuture checkNonPersistentNonPartitionedTopicExists(St brokerUrl = lookupData.getBrokerUrl(); } return pulsarClient.getLookup(brokerUrl) - .getPartitionedTopicMetadata(topicName, false, false) + .getPartitionedTopicMetadata(topicName, false) .thenApply(metadata -> true) .exceptionallyCompose(ex -> { Throwable actEx = FutureUtil.unwrapCompletionException(ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index 9116fa9f83506..dbf70b57f53a4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -296,5 +296,15 @@ public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean confi Awaitility.await().untilAsserted(() -> { assertEquals(getLookupRequestPermits(), lookupPermitsBefore); }); + + // reset clients. + for (PulsarClientImpl client : Arrays.asList(client1, client2)) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, false); + } + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 526a417c05e79..55518ff1e5686 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -271,6 +271,16 @@ public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) t // Cleanup. admin1.topics().deletePartitionedTopic(tp, false); } + + // reset clients. + for (PulsarClientImpl client : clients) { + ConnectionPool pool = client.getCnxPool(); + for (CompletableFuture connectionFuture : pool.getConnections()) { + ClientCnx clientCnx = connectionFuture.join(); + clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); + field.set(clientCnx, true); + } + } } @DataProvider(name = "autoCreationParamsAll") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java index 354c26078af88..c90ad15242c85 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java @@ -134,6 +134,8 @@ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion() // we want to skip the "lookup" phase, because it is blocked by the HTTP API LookupService mockLookup = mock(LookupService.class); ((PulsarClientImpl) pulsarClient).setLookup(mockLookup); + when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean())).thenAnswer( + i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean())).thenAnswer( i -> CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); when(mockLookup.getBroker(any())).thenAnswer(ignored -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index f7e51245136c8..2e6d9c61bde79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -148,7 +148,7 @@ public void testTransactionBufferLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false) + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false) .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { @@ -254,7 +254,7 @@ public void testPendingAckLowWaterMark() throws Exception { PartitionedTopicMetadata partitionedTopicMetadata = ((PulsarClientImpl) pulsarClient).getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false, false) + .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, false) .get(); Transaction lowWaterMarkTxn = null; for (int i = 0; i < partitionedTopicMetadata.partitions; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java index b15b02255e0c3..157df1185307a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java @@ -933,7 +933,7 @@ public void testMergeGetPartitionedMetadataRequests() throws Exception { // Verify the request is works after merge the requests. List> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { - futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false, false)); + futures.add(lookupService.getPartitionedTopicMetadata(TopicName.get(tpName), false)); } for (CompletableFuture future : futures) { assertEquals(future.join().partitions, topicPartitions); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 57d709e9768c3..e25212e0108f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -208,7 +208,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); Assert.fail("Expected an error that the broker version is too old."); } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); + Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker")); } // cleanup. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 13ff54c273a57..3367ae99cb1a2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -68,6 +68,14 @@ default CompletableFuture getPartitionedTopicMetadata( return getPartitionedTopicMetadata(topicName, true, true); } + /** + * See the doc {@link #getPartitionedTopicMetadata(TopicName, boolean, boolean)}. + */ + default CompletableFuture getPartitionedTopicMetadata(TopicName topicName, + boolean metadataAutoCreationEnabled) { + return getPartitionedTopicMetadata(topicName, metadataAutoCreationEnabled, false); + } + /** * 1.Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; * return "{partition: 0}" if a non-partitioned topic exists. From 16d270c88e84aefb449a289e07b7ede3c2bbbf23 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 10 Aug 2024 14:41:42 +0800 Subject: [PATCH 15/15] fix tests --- .../broker/admin/GetPartitionMetadataMultiBrokerTest.java | 4 ++-- .../apache/pulsar/broker/admin/GetPartitionMetadataTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java index dbf70b57f53a4..60691203e777d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java @@ -247,7 +247,7 @@ public Object[][] autoCreationParamsAllForNonPersistentTopic(){ }; } - @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic") + @Test(dataProvider = "autoCreationParamsAllForNonPersistentTopic", priority = Integer.MAX_VALUE) public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean configAllowAutoTopicCreation, boolean paramMetadataAutoCreationEnabled, boolean isUsingHttpLookup) throws Exception { @@ -303,7 +303,7 @@ public void testCompatibilityDifferentBrokersForNonPersistentTopic(boolean confi for (CompletableFuture connectionFuture : pool.getConnections()) { ClientCnx clientCnx = connectionFuture.join(); clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation(); - field.set(clientCnx, false); + field.set(clientCnx, true); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java index 55518ff1e5686..87bc4267b48a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java @@ -229,7 +229,7 @@ public void testAutoCreatingMetadataWhenCallingOldAPI(TopicDomain topicDomain) t } } - @Test(dataProvider = "topicDomains") + @Test(dataProvider = "topicDomains", priority = Integer.MAX_VALUE) public void testCompatibilityForNewClientAndOldBroker(TopicDomain topicDomain) throws Exception { modifyTopicAutoCreation(true, TopicType.PARTITIONED, 3); // Initialize connections.