From 0eaf58661c3b52492ce84336363b4797f8254e89 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 00:35:52 +0800 Subject: [PATCH 01/16] [improve][log] Improve replicator's log when partitions count between two clusters is not the same --- .../broker/service/persistent/PersistentReplicator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 54b8993784e29..33e883ab9406a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -154,6 +154,12 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { Pair changeStateRes; changeStateRes = compareSetAndGetState(Starting, Started); if (changeStateRes.getLeft()) { + if (!(producer instanceof ProducerImpl)) { + log.error("[{}] The partitions count between two clusters is not the same, the replicator can not be" + + " created successfully: {}", replicatorId, state); + doCloseProducerAsync(producer, () -> {}); + throw new ClassCastException(producer.getClass().getName() + " can not be cast to ProducerImpl"); + } this.producer = (ProducerImpl) producer; HAVE_PENDING_READ_UPDATER.set(this, FALSE); // Trigger a new read. From b61ce17a8a339c41add9d0f48a62fff4c909fdc8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 10:35:21 +0800 Subject: [PATCH 02/16] - --- .../pulsar/broker/service/AbstractReplicator.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 8552a9f09e93b..b8510d37f8c26 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -184,8 +184,18 @@ public void startProducer() { } log.info("[{}] Starting replicator", replicatorId); - producerBuilder.createAsync().thenAccept(producer -> { - setProducerAndTriggerReadEntries(producer); + replicationClient.getPartitionedTopicMetadata(remoteTopicName, false) + .thenCompose(metadata -> { + if (metadata.partitions != 0) { + log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," + + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, + state); + // This exception will be caught below, so it can be any typed. + throw new RuntimeException(replicatorId + "Can not replicate data to a partitioned topic."); + } + return producerBuilder.createAsync().thenAccept(producer -> { + setProducerAndTriggerReadEntries(producer); + }); }).exceptionally(ex -> { Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); if (setDisconnectedRes.getLeft()) { From 72a5d75ec7f0d6e44b0f52576ce0d2980e7d467e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 17:00:01 +0800 Subject: [PATCH 03/16] fix issue --- .../broker/service/AbstractReplicator.java | 8 ++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 20 ++++++++++++------- .../impl/conf/ProducerConfigurationData.java | 2 ++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index b8510d37f8c26..6efc8025570c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -33,10 +33,12 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; @@ -186,6 +188,7 @@ public void startProducer() { log.info("[{}] Starting replicator", replicatorId); replicationClient.getPartitionedTopicMetadata(remoteTopicName, false) .thenCompose(metadata -> { + // If there is an exists partitioned topic on the remote cluster, report an error. if (metadata.partitions != 0) { log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, @@ -193,6 +196,11 @@ public void startProducer() { // This exception will be caught below, so it can be any typed. throw new RuntimeException(replicatorId + "Can not replicate data to a partitioned topic."); } + // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on + // the remote cluster. + ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; + builderImpl.getConf().setForceOnoPartitioned(true); + return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 120bdeb569c69..aed57413a5e90 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -388,15 +388,22 @@ private CompletableFuture> createProducerAsync(String topic, ProducerInterceptors interceptors) { CompletableFuture> producerCreatedFuture = new CompletableFuture<>(); - getPartitionedTopicMetadata(topic, true).thenAccept(metadata -> { + CompletableFuture partitionsFuture; + if (conf.isForceOnoPartitioned()) { + partitionsFuture = CompletableFuture.completedFuture(0); + } else { + partitionsFuture = getPartitionedTopicMetadata(topic, true).thenApply(metadata -> metadata.partitions); + } + + partitionsFuture.thenAccept(partitions -> { if (log.isDebugEnabled()) { - log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); + log.debug("[{}] Received topic metadata. partitions: {}", topic, partitions); } ProducerBase producer; - if (metadata.partitions > 0) { + if (partitions > 0) { producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture, - metadata); + partitions); } else { producer = newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture, Optional.empty()); @@ -422,7 +429,6 @@ private CompletableFuture> createProducerAsync(String topic, * @param schema topic schema * @param interceptors producer interceptors * @param producerCreatedFuture future for signaling completion of async producer creation - * @param metadata partitioned topic metadata * @param message type class * @return new PartitionedProducerImpl instance */ @@ -432,8 +438,8 @@ protected PartitionedProducerImpl newPartitionedProducerImpl(String topic ProducerInterceptors interceptors, CompletableFuture> producerCreatedFuture, - PartitionedTopicMetadata metadata) { - return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, + int partitions) { + return new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, partitions, producerCreatedFuture, schema, interceptors); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 581b3d8a1635e..721dfb4e27ac3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -204,6 +204,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private SortedMap properties = new TreeMap<>(); + private boolean forceOnoPartitioned; + @ApiModelProperty( name = "initialSubscriptionName", value = "Use this configuration to automatically create an initial subscription when creating a topic." From 7254197ea1df8831d50116b72f0b2fb9dca8ddbe Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 20:22:44 +0800 Subject: [PATCH 04/16] add test --- .../broker/service/AbstractReplicator.java | 1 - .../broker/service/OneWayReplicatorTest.java | 30 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 6efc8025570c4..d6b6858c93581 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -33,7 +33,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 9aad26530df5b..4fd9407d1d370 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -79,9 +79,11 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -1069,4 +1071,32 @@ public void testConfigReplicationStartAt() throws Exception { admin1.topics().delete(topic3, false); admin2.topics().delete(topic3, false); } + + @Test + public void test1() throws Exception { + String ns = defaultTenant + "/ns_2"/* + UUID.randomUUID().toString().replace("-", "")*/; + admin1.namespaces().createNamespace(ns); + admin2.namespaces().createNamespace(ns); + + AutoTopicCreationOverride autoTopicCreation = + AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true) + .topicType("partitioned").defaultNumPartitions(2).build(); + admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2); + }); + + final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); + admin1.topics().createNonPartitionedTopic(topic1); + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + + Producer p1 = client1.newProducer(Schema.STRING).topic(topic1).create(); + p1.send("msg-1"); + p1.close(); + + Thread.sleep(3 * 1000); + + System.out.println(pulsar1.getBrokerService().getTopics().keys()); + System.out.println(pulsar2.getBrokerService().getTopics().keys()); + } } From 26d3ce59a17172741720b65dd6830bfdfd12827f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 21:28:31 +0800 Subject: [PATCH 05/16] improve test --- .../broker/service/AbstractReplicator.java | 30 ++++++-- .../broker/service/OneWayReplicatorTest.java | 75 +++++++++++++++++-- 2 files changed, 92 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d6b6858c93581..4fdbe46973d7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -33,9 +33,11 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -185,21 +187,39 @@ public void startProducer() { } log.info("[{}] Starting replicator", replicatorId); - replicationClient.getPartitionedTopicMetadata(remoteTopicName, false) - .thenCompose(metadata -> { + CompletableFuture checkPartitionsSameFuture = new CompletableFuture<>(); + replicationClient.getPartitionedTopicMetadata(remoteTopicName, false).thenAccept(metadata -> { // If there is an exists partitioned topic on the remote cluster, report an error. if (metadata.partitions != 0) { log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," - + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, + + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, state); // This exception will be caught below, so it can be any typed. - throw new RuntimeException(replicatorId + "Can not replicate data to a partitioned topic."); + checkPartitionsSameFuture.completeExceptionally(new RuntimeException(replicatorId + + "Can not replicate data to a partitioned topic.")); + } else { + checkPartitionsSameFuture.complete(null); + } + }).exceptionally(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { + // These 3 error means the topic has not been created on the remote cluster yet, and the current + // replicator will trigger an event to create it. So it is okay. + checkPartitionsSameFuture.complete(null); + } else { + log.warn("[{}] Failed to create remote producer due to get partitioned metadata failed", + replicatorId, ex); + checkPartitionsSameFuture.completeExceptionally(ex); } + return null; + }); + checkPartitionsSameFuture.thenCompose(metadata -> { // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on // the remote cluster. ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; builderImpl.getConf().setForceOnoPartitioned(true); - return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 4fd9407d1d370..a0ad7c8ec04df 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -40,6 +40,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -50,7 +51,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Data; import lombok.SneakyThrows; @@ -1072,12 +1075,28 @@ public void testConfigReplicationStartAt() throws Exception { admin2.topics().delete(topic3, false); } - @Test - public void test1() throws Exception { + @DataProvider(name = "replicationModes") + public Object[][] replicationModes() { + return new Object[][]{ + {ReplicationMode.OneWay}, + {ReplicationMode.DoubleWay} + }; + } + + enum ReplicationMode { + OneWay, + DoubleWay; + } + + @Test(dataProvider = "replicationModes") + public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception { String ns = defaultTenant + "/ns_2"/* + UUID.randomUUID().toString().replace("-", "")*/; admin1.namespaces().createNamespace(ns); admin2.namespaces().createNamespace(ns); + // Set topic auto-creation rule. + // c1: no-partitioned topic + // c2: partitioned topic with 2 partitions. AutoTopicCreationOverride autoTopicCreation = AutoTopicCreationOverrideImpl.builder().allowAutoTopicCreation(true) .topicType("partitioned").defaultNumPartitions(2).build(); @@ -1086,17 +1105,57 @@ public void test1() throws Exception { assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2); }); - final String topic1 = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); - admin1.topics().createNonPartitionedTopic(topic1); + // Create non-partitioned topic. + // Enable replication. + final String tp = BrokerTestUtil.newUniqueName("persistent://" + ns + "/tp_"); + admin1.topics().createNonPartitionedTopic(tp); admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + if (replicationMode.equals(ReplicationMode.DoubleWay)) { + admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1, cluster2))); + } - Producer p1 = client1.newProducer(Schema.STRING).topic(topic1).create(); + // Trigger and wait for replicator starts. + Producer p1 = client1.newProducer(Schema.STRING).topic(tp).create(); p1.send("msg-1"); p1.close(); + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get(); + assertFalse(persistentTopic.getReplicators().isEmpty()); + }); - Thread.sleep(3 * 1000); + // Verify: the topics are the same between two clusters. + Predicate topicNameFilter = t -> { + TopicName topicName = TopicName.get(t); + if (!topicName.getNamespace().equals(ns)) { + return false; + } + return t.startsWith(tp); + }; + Awaitility.await().untilAsserted(() -> { + List topics1 = pulsar1.getBrokerService().getTopics().keys() + .stream().filter(topicNameFilter).collect(Collectors.toList()); + List topics2 = pulsar2.getBrokerService().getTopics().keys() + .stream().filter(topicNameFilter).collect(Collectors.toList()); + Collections.sort(topics1); + Collections.sort(topics2); + assertEquals(topics1, topics2); + }); - System.out.println(pulsar1.getBrokerService().getTopics().keys()); - System.out.println(pulsar2.getBrokerService().getTopics().keys()); + // cleanup. + admin1.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster1))); + if (replicationMode.equals(ReplicationMode.DoubleWay)) { + admin2.namespaces().setNamespaceReplicationClusters(ns, new HashSet<>(Arrays.asList(cluster2))); + } + Awaitility.await().untilAsserted(() -> { + PersistentTopic persistentTopic = (PersistentTopic) broker1.getTopic(tp, false).join().get(); + assertTrue(persistentTopic.getReplicators().isEmpty()); + if (replicationMode.equals(ReplicationMode.DoubleWay)) { + assertTrue(persistentTopic.getReplicators().isEmpty()); + } + }); + admin1.topics().delete(tp, false); + admin2.topics().delete(tp, false); + admin1.namespaces().deleteNamespace(ns); + admin2.namespaces().deleteNamespace(ns); } } From d843f30c9040c06df2c4c553e6d5014f6ba845ce Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 27 Jun 2024 22:05:54 +0800 Subject: [PATCH 06/16] fix test --- .../apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 +- .../broker/service/OneWayReplicatorUsingGlobalZKTest.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index a0ad7c8ec04df..f985c8cb3a711 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1083,7 +1083,7 @@ public Object[][] replicationModes() { }; } - enum ReplicationMode { + protected enum ReplicationMode { OneWay, DoubleWay; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 31e94f435f0f6..34810bbe9057b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -161,4 +161,10 @@ public void testConfigReplicationStartAt() throws Exception { pulsar1.getConfiguration().getReplicationStartAt().equalsIgnoreCase("latest"); }); } + + @Test(enabled = false) + @Override + public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception { + super.testDifferentTopicCreationRule(replicationMode); + } } From 7c0d8119227524621e81453946ec2ee77f5529a0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 6 Jul 2024 03:11:24 +0800 Subject: [PATCH 07/16] address comments --- .../org/apache/pulsar/broker/service/AbstractReplicator.java | 2 +- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 +- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- .../pulsar/client/impl/conf/ProducerConfigurationData.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 4fdbe46973d7e..4ba1b5afe34ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -219,7 +219,7 @@ public void startProducer() { // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on // the remote cluster. ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; - builderImpl.getConf().setForceOnoPartitioned(true); + builderImpl.getConf().setForceNoPartitioned(true); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f985c8cb3a711..c42d4df417188 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1090,7 +1090,7 @@ protected enum ReplicationMode { @Test(dataProvider = "replicationModes") public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception { - String ns = defaultTenant + "/ns_2"/* + UUID.randomUUID().toString().replace("-", "")*/; + String ns = defaultTenant + "/" + UUID.randomUUID().toString().replace("-", ""); admin1.namespaces().createNamespace(ns); admin2.namespaces().createNamespace(ns); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index aed57413a5e90..c947bf1246c90 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -389,7 +389,7 @@ private CompletableFuture> createProducerAsync(String topic, CompletableFuture> producerCreatedFuture = new CompletableFuture<>(); CompletableFuture partitionsFuture; - if (conf.isForceOnoPartitioned()) { + if (conf.isForceNoPartitioned()) { partitionsFuture = CompletableFuture.completedFuture(0); } else { partitionsFuture = getPartitionedTopicMetadata(topic, true).thenApply(metadata -> metadata.partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 721dfb4e27ac3..4551c5dce1600 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -204,7 +204,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private SortedMap properties = new TreeMap<>(); - private boolean forceOnoPartitioned; + private boolean forceNoPartitioned; @ApiModelProperty( name = "initialSubscriptionName", From 853904aa7a328e531c6d2d2cc04d1d6febcd0110 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sun, 7 Jul 2024 21:49:32 +0800 Subject: [PATCH 08/16] address comments --- .../pulsar/client/impl/PulsarClientImpl.java | 46 +++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index c947bf1246c90..f22befc000034 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -49,9 +49,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; import lombok.Builder; import lombok.Getter; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; @@ -382,33 +384,61 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat } + private CompletableFuture checkPartitions(String topic, boolean forceNoPartitioned, + @Nullable String producerNameForLog) { + CompletableFuture checkPartitions = new CompletableFuture<>(); + getPartitionedTopicMetadata(topic, !forceNoPartitioned).thenAccept(metadata -> { + if (forceNoPartitioned && metadata.partitions > 0) { + String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" + + " partitions, but the producer does not support for a partitioned topic.", + producerNameForLog, topic, metadata.partitions); + checkPartitions.completeExceptionally( + new PulsarClientException.NotConnectedException(errorMsg)); + } else { + checkPartitions.complete(metadata.partitions); + } + }).exceptionally(ex -> { + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (forceNoPartitioned && actEx instanceof PulsarClientException.NotFoundException + || actEx instanceof PulsarClientException.TopicDoesNotExistException + || actEx instanceof PulsarAdminException.NotFoundException) { + checkPartitions.complete(0); + } else { + checkPartitions.completeExceptionally(ex); + } + return null; + }); + return checkPartitions; + } + private CompletableFuture> createProducerAsync(String topic, ProducerConfigurationData conf, Schema schema, ProducerInterceptors interceptors) { CompletableFuture> producerCreatedFuture = new CompletableFuture<>(); - CompletableFuture partitionsFuture; - if (conf.isForceNoPartitioned()) { - partitionsFuture = CompletableFuture.completedFuture(0); - } else { - partitionsFuture = getPartitionedTopicMetadata(topic, true).thenApply(metadata -> metadata.partitions); - } - partitionsFuture.thenAccept(partitions -> { + + checkPartitions(topic, conf.isForceNoPartitioned(), conf.getProducerName()).thenAccept(partitions -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, partitions); } ProducerBase producer; if (partitions > 0) { + if (conf.isForceNoPartitioned()) { + String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" + + " partitions, but the producer does not support for a partitioned topic.", + conf.getProducerName(), topic, partitions); + producerCreatedFuture.completeExceptionally( + new PulsarClientException.NotConnectedException(errorMsg)); + } producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture, partitions); } else { producer = newProducerImpl(topic, -1, conf, schema, interceptors, producerCreatedFuture, Optional.empty()); } - producers.add(producer); }).exceptionally(ex -> { log.warn("[{}] Failed to get partitioned topic metadata: {}", topic, ex.getMessage()); From 1ceee529873b9209169370bda9dfbcc72d4bd079 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 9 Jul 2024 00:01:18 +0800 Subject: [PATCH 09/16] add more tests --- .../broker/service/AbstractReplicator.java | 2 +- .../api/NonPartitionedTopicExpectedTest.java | 118 ++++++++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 4 +- .../impl/conf/ProducerConfigurationData.java | 2 +- 4 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 4ba1b5afe34ca..c8218c13cd13e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -219,7 +219,7 @@ public void startProducer() { // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on // the remote cluster. ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; - builderImpl.getConf().setForceNoPartitioned(true); + builderImpl.getConf().setNonPartitionedTopicExpected(true); return producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java new file mode 100644 index 0000000000000..7b0edd314d055 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPartitionedTopicExpectedTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +public class NonPartitionedTopicExpectedTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testWhenNonPartitionedTopicExists() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topic); + ProducerBuilderImpl producerBuilder = + (ProducerBuilderImpl) pulsarClient.newProducer(Schema.STRING).topic(topic); + producerBuilder.getConf().setNonPartitionedTopicExpected(true); + // Verify: create successfully. + Producer producer = producerBuilder.create(); + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + } + + @Test + public void testWhenPartitionedTopicExists() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createPartitionedTopic(topic, 2); + ProducerBuilderImpl producerBuilder = + (ProducerBuilderImpl) pulsarClient.newProducer(Schema.STRING).topic(topic); + producerBuilder.getConf().setNonPartitionedTopicExpected(true); + // Verify: failed to create. + try { + producerBuilder.create(); + Assert.fail("expected an error since producer expected a non-partitioned topic"); + } catch (Exception ex) { + // expected an error. + log.error("expected error", ex); + } + // cleanup. + admin.topics().deletePartitionedTopic(topic, false); + } + + @DataProvider(name = "topicTypes") + public Object[][] topicTypes() { + return new Object[][]{ + {TopicType.PARTITIONED}, + {TopicType.NON_PARTITIONED} + }; + } + + @Test(dataProvider = "topicTypes") + public void testWhenTopicNotExists(TopicType topicType) throws Exception { + final String namespace = "public/default"; + final String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/tp"); + final TopicName topicName = TopicName.get(topic); + AutoTopicCreationOverride.Builder policyBuilder = AutoTopicCreationOverride.builder() + .topicType(topicType.toString()).allowAutoTopicCreation(true); + if (topicType.equals(TopicType.PARTITIONED)) { + policyBuilder.defaultNumPartitions(2); + } + AutoTopicCreationOverride policy = policyBuilder.build(); + admin.namespaces().setAutoTopicCreation(namespace, policy); + + ProducerBuilderImpl producerBuilder = + (ProducerBuilderImpl) pulsarClient.newProducer(Schema.STRING).topic(topic); + producerBuilder.getConf().setNonPartitionedTopicExpected(true); + // Verify: create successfully. + Producer producer = producerBuilder.create(); + // Verify: only create non-partitioned topic. + Assert.assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + .partitionedTopicExists(topicName)); + Assert.assertTrue(pulsar.getNamespaceService().checkNonPartitionedTopicExists(topicName).join()); + + // cleanup. + producer.close(); + admin.topics().delete(topic, false); + admin.namespaces().removeAutoTopicCreation(namespace); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index f22befc000034..e67f37eb1d428 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -419,14 +419,14 @@ private CompletableFuture> createProducerAsync(String topic, - checkPartitions(topic, conf.isForceNoPartitioned(), conf.getProducerName()).thenAccept(partitions -> { + checkPartitions(topic, conf.isNonPartitionedTopicExpected(), conf.getProducerName()).thenAccept(partitions -> { if (log.isDebugEnabled()) { log.debug("[{}] Received topic metadata. partitions: {}", topic, partitions); } ProducerBase producer; if (partitions > 0) { - if (conf.isForceNoPartitioned()) { + if (conf.isNonPartitionedTopicExpected()) { String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" + " partitions, but the producer does not support for a partitioned topic.", conf.getProducerName(), topic, partitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java index 4551c5dce1600..6ec738bbf4c8d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java @@ -204,7 +204,7 @@ public class ProducerConfigurationData implements Serializable, Cloneable { private SortedMap properties = new TreeMap<>(); - private boolean forceNoPartitioned; + private boolean isNonPartitionedTopicExpected; @ApiModelProperty( name = "initialSubscriptionName", From 814b6781b891c19d1f9fd4240f3ef4462a98b42e Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 13:59:26 +0800 Subject: [PATCH 10/16] fix bug --- .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e67f37eb1d428..bf67938dc5b0e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -432,6 +432,7 @@ private CompletableFuture> createProducerAsync(String topic, conf.getProducerName(), topic, partitions); producerCreatedFuture.completeExceptionally( new PulsarClientException.NotConnectedException(errorMsg)); + return; } producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture, partitions); From efccc697f478ca81c8a1ebdef105d737dc88fc54 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 14:05:03 +0800 Subject: [PATCH 11/16] remove unnecessary changes --- .../broker/service/AbstractReplicator.java | 44 +++---------------- .../broker/service/OneWayReplicatorTest.java | 2 - 2 files changed, 6 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index c8218c13cd13e..424263720f012 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -33,11 +33,9 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; @@ -187,42 +185,12 @@ public void startProducer() { } log.info("[{}] Starting replicator", replicatorId); - CompletableFuture checkPartitionsSameFuture = new CompletableFuture<>(); - replicationClient.getPartitionedTopicMetadata(remoteTopicName, false).thenAccept(metadata -> { - // If there is an exists partitioned topic on the remote cluster, report an error. - if (metadata.partitions != 0) { - log.error("[{}] The partitions count between two clusters is not the same(remote partitions: {})," - + " the replicator can not be created successfully: {}", replicatorId, metadata.partitions, - state); - // This exception will be caught below, so it can be any typed. - checkPartitionsSameFuture.completeExceptionally(new RuntimeException(replicatorId - + "Can not replicate data to a partitioned topic.")); - } else { - checkPartitionsSameFuture.complete(null); - } - }).exceptionally(ex -> { - Throwable actEx = FutureUtil.unwrapCompletionException(ex); - if (actEx instanceof PulsarClientException.NotFoundException - || actEx instanceof PulsarClientException.TopicDoesNotExistException - || actEx instanceof PulsarAdminException.NotFoundException) { - // These 3 error means the topic has not been created on the remote cluster yet, and the current - // replicator will trigger an event to create it. So it is okay. - checkPartitionsSameFuture.complete(null); - } else { - log.warn("[{}] Failed to create remote producer due to get partitioned metadata failed", - replicatorId, ex); - checkPartitionsSameFuture.completeExceptionally(ex); - } - return null; - }); - checkPartitionsSameFuture.thenCompose(metadata -> { - // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on - // the remote cluster. - ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; - builderImpl.getConf().setNonPartitionedTopicExpected(true); - return producerBuilder.createAsync().thenAccept(producer -> { - setProducerAndTriggerReadEntries(producer); - }); + // Force only replicate messages to a non-partitioned topic, to avoid auto-create a partitioned topic on + // the remote cluster. + ProducerBuilderImpl builderImpl = (ProducerBuilderImpl) producerBuilder; + builderImpl.getConf().setNonPartitionedTopicExpected(true); + producerBuilder.createAsync().thenAccept(producer -> { + setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); if (setDisconnectedRes.getLeft()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index c42d4df417188..ed2cc7848682c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1155,7 +1155,5 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro }); admin1.topics().delete(tp, false); admin2.topics().delete(tp, false); - admin1.namespaces().deleteNamespace(ns); - admin2.namespaces().deleteNamespace(ns); } } From 7ec9ceb0a4c81d9e511f5c4cb2901f9a54fbf321 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 14:11:30 +0800 Subject: [PATCH 12/16] remove unnecessary changes --- .../org/apache/pulsar/client/impl/PulsarClientImpl.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index bf67938dc5b0e..2b29253ec3116 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -392,6 +392,7 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" + " partitions, but the producer does not support for a partitioned topic.", producerNameForLog, topic, metadata.partitions); + log.error(errorMsg); checkPartitions.completeExceptionally( new PulsarClientException.NotConnectedException(errorMsg)); } else { @@ -426,14 +427,6 @@ private CompletableFuture> createProducerAsync(String topic, ProducerBase producer; if (partitions > 0) { - if (conf.isNonPartitionedTopicExpected()) { - String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" - + " partitions, but the producer does not support for a partitioned topic.", - conf.getProducerName(), topic, partitions); - producerCreatedFuture.completeExceptionally( - new PulsarClientException.NotConnectedException(errorMsg)); - return; - } producer = newPartitionedProducerImpl(topic, conf, schema, interceptors, producerCreatedFuture, partitions); } else { From 879c1b4192b62a5d6400fc1e5233eea75f0a06ca Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 14:30:42 +0800 Subject: [PATCH 13/16] fix log's bug --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 ++ .../java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index ed2cc7848682c..d56bc5a0d5e71 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1103,6 +1103,8 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro admin2.namespaces().setAutoTopicCreation(ns, autoTopicCreation); Awaitility.await().untilAsserted(() -> { assertEquals(admin2.namespaces().getAutoTopicCreationAsync(ns).join().getDefaultNumPartitions(), 2); + // Trigger system topic __change_event's initialize. + pulsar2.getTopicPoliciesService().getTopicPoliciesAsync(TopicName.get("persistent://" + ns + "/1")); }); // Create non-partitioned topic. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 2b29253ec3116..4585b5328129b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -389,7 +389,7 @@ private CompletableFuture checkPartitions(String topic, boolean forceNo CompletableFuture checkPartitions = new CompletableFuture<>(); getPartitionedTopicMetadata(topic, !forceNoPartitioned).thenAccept(metadata -> { if (forceNoPartitioned && metadata.partitions > 0) { - String errorMsg = String.format("Can not create the producer[{}] for the topic[{}] that contains {}" + String errorMsg = String.format("Can not create the producer[%s] for the topic[%s] that contains %s" + " partitions, but the producer does not support for a partitioned topic.", producerNameForLog, topic, metadata.partitions); log.error(errorMsg); From 9c1549848de8f1133d1973114fef37e473f0d2e1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 14:33:46 +0800 Subject: [PATCH 14/16] cleanup the resource that the test created --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index d56bc5a0d5e71..1745d4dc90f3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -1157,5 +1157,7 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro }); admin1.topics().delete(tp, false); admin2.topics().delete(tp, false); + admin1.namespaces().deleteNamespace(ns); + admin2.namespaces().deleteNamespace(ns); } } From 6f20dc111469a06b8494cc8b13690949ac8090d2 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 17:36:20 +0800 Subject: [PATCH 15/16] fix test --- .../pulsar/broker/service/AbstractReplicatorTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 64d3088b20622..09a1cefb682d8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -42,7 +42,9 @@ import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -71,7 +73,8 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { when(localClient.getCnxPool()).thenReturn(connectionPool); final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); when(remoteClient.getCnxPool()).thenReturn(connectionPool); - final ProducerBuilder producerBuilder = mock(ProducerBuilder.class); + final ProducerConfigurationData producerConf = new ProducerConfigurationData(); + final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class); final ConcurrentOpenHashMap>> topics = new ConcurrentOpenHashMap<>(); when(broker.executor()).thenReturn(eventLoopGroup); when(broker.getTopics()).thenReturn(topics); @@ -87,6 +90,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder); when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder); when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder); + when(producerBuilder.getConf()).thenReturn(producerConf); // Mock create producer fail. when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex")); when(producerBuilder.createAsync()) From ad95488ac20c5a613dea8b5478570c7c9da1c6d4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 15 Jul 2024 20:30:28 +0800 Subject: [PATCH 16/16] checkstyle --- .../org/apache/pulsar/broker/service/AbstractReplicatorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 09a1cefb682d8..7415a40ad5553 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -39,7 +39,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ProducerBuilderImpl;