Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pip/pip-344.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ message FeatureFlags {

# Backward & Forward Compatibility

- Old version client and New version Broker: The client will call the old API.
Old version (`< 3.0.6`) client and New version (`>= 3.0.6`) Broker: The client will call the old API.

- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false.
New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false in the following cases:
- The topic is a DLQ topic
- The topic is non-persistent
- The topic is in geo-replication that the local cluster is new version and the remote cluster is old version
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Excep
field.set(clientCnxFuture.get(), false);
}
try {
clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
clientWitBinaryLookup.getPartitionedTopicMetadata(topic, false, false).join();
Assert.fail("Expected an error that the broker version is too old.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported by the broker"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ private void getPartitionedTopicMetadata(TopicName topicName,

@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean metadataAutoCreationEnabled) {
return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, false).thenApply(metadata -> {
return getPartitionedTopicMetadata(topic, metadataAutoCreationEnabled, true).thenApply(metadata -> {
if (metadata.partitions > 0) {
TopicName topicName = TopicName.get(topic);
List<String> partitions = new ArrayList<>(metadata.partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,19 @@
package org.apache.pulsar.tests.integration.backwardscompatibility;


import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.tests.integration.topologies.ClientTestBase;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ClientTest25 extends PulsarStandaloneTestSuite25 {
Expand All @@ -34,4 +45,47 @@ public void testResetCursorCompatibility(Supplier<String> serviceUrl, Supplier<S
clientTestBase.resetCursorCompatibility(serviceUrl.get(), httpServiceUrl.get(), topicName);
}

@Test(timeOut = 20000)
public void testAutoPartitionsUpdate() throws Exception {
@Cleanup final var pulsarClient = PulsarClient.builder()
.serviceUrl(getContainer().getPlainTextServiceUrl())
.build();
final var topic = "test-auto-part-update";
final var topic2 = "dummy-topic";
@Cleanup final var admin = PulsarAdmin.builder().serviceHttpUrl(getContainer().getHttpServiceUrl()).build();
// Use 2 as the initial partition number because old version broker cannot update partitions on a topic that
// has only 1 partition.
admin.topics().createPartitionedTopic(topic, 2);
admin.topics().createPartitionedTopic(topic2, 2);
@Cleanup final var producer = pulsarClient.newProducer().autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return metadata.numPartitions() - 1;
}
})
.topic(topic)
.create();
@Cleanup final var consumer = pulsarClient.newConsumer().autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.topic(topic).subscriptionName("sub")
.subscribe();
@Cleanup final var multiTopicsConsumer = pulsarClient.newConsumer().autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.topics(List.of(topic, topic2)).subscriptionName("sub-2").subscribe();

admin.topics().updatePartitionedTopic(topic, 3);
Thread.sleep(1500);
final var msgId = (MessageIdAdv) producer.send("msg".getBytes());
Assert.assertEquals(msgId.getPartitionIndex(), 2);

final var msg = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
final var msg2 = multiTopicsConsumer.receive(3, TimeUnit.SECONDS);
Assert.assertNotNull(msg2);
Assert.assertEquals(msg2.getMessageId(), msg.getMessageId());
}
}
Loading