diff --git a/pip/pip-344.md b/pip/pip-344.md index 5eafc6fd5c279..120a8e1ef0d4a 100644 --- a/pip/pip-344.md +++ b/pip/pip-344.md @@ -122,6 +122,9 @@ message FeatureFlags { # Backward & Forward Compatibility -- Old version client and New version Broker: The client will call the old API. +Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client will call the old API. -- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false. +New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false in the following cases: +- The topic is a DLQ topic +- The topic is non-persistent +- The topic is in geo-replication that the local cluster is new version and the remote cluster is old version diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 3b30f0011e5c8..6e9bc4595a030 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -260,7 +260,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep field.set(clientCnxFuture.get(), false); } try { - clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); + clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false, false).join(); Assert.fail("Expected an error that the broker version is too old."); } catch (Exception ex) { Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker")); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 6ffdfa55a9b7a..5f5239131a878 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -1291,7 +1291,7 @@ private void getPartitionedTopicMetadata(TopicName topicName, @Override public CompletableFuture> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) { - return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> { + return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, true).thenApply(metadata -> { if (metadata.partitions > 0) { TopicName topicName = TopicName.get(topic); List partitions = new ArrayList<>(metadata.partitions); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java index 2e153b4ec7fdc..9fcbf66765931 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/backwardscompatibility/ClientTest25.java @@ -19,8 +19,19 @@ package org.apache.pulsar.tests.integration.backwardscompatibility; +import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.tests.integration.topologies.ClientTestBase; +import org.testng.Assert; import org.testng.annotations.Test; public class ClientTest25 extends PulsarStandaloneTestSuite25 { @@ -34,4 +45,47 @@ public void testResetCursorCompatibility(Supplier serviceUrl, Supplier msg, TopicMetadata metadata) { + return metadata.numPartitions() - 1; + } + }) + .topic(topic) + .create(); + @Cleanup final var consumer = pulsarClient.newConsumer().autoUpdatePartitions(true) + .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS) + .topic(topic).subscriptionName("sub") + .subscribe(); + @Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer().autoUpdatePartitions(true) + .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS) + .topics(List.of(topic, topic2)).subscriptionName("sub-2").subscribe(); + + admin.topics().updatePartitionedTopic(topic, 3); + Thread.sleep(1500); + final var msgId = (MessageIdAdv) producer.send("msg".getBytes()); + Assert.assertEquals(msgId.getPartitionIndex(), 2); + + final var msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2); + final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(msg2); + Assert.assertEquals(msg2.getMessageId(), msg.getMessageId()); + } }