From 8ee62c75623f92cf41c0b4ca0402d75220dcb8ea Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 23 Apr 2024 09:29:22 +0800 Subject: [PATCH 1/7] [fix] [broker] Part-2: Replicator can not created successfully due to an orphan replicator in the previous topic owner --- .../persistent/PersistentReplicator.java | 4 +- .../service/persistent/PersistentTopic.java | 56 ++++++++++++------- .../service/OneWayReplicatorTestBase.java | 14 +++++ 3 files changed, 51 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 5e1cc4a936a75..c793a7df04ba4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -450,7 +450,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof CursorAlreadyClosedException) { - log.error("[{}] Error reading entries because replicator is" + log.warn("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be disconnected. @@ -570,7 +570,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx, exception.getMessage(), exception); if (exception instanceof CursorAlreadyClosedException) { - log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + log.warn("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be disconnected. terminate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9d6855962ced6..3e5d1b98e4163 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1423,8 +1423,6 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, List> futures = new ArrayList<>(); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); if (closeIfClientsConnected) { - replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); } FutureUtil.waitForAll(futures).thenRunAsync(() -> { @@ -1468,16 +1466,18 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - brokerService.removeTopicFromCache(PersistentTopic.this); + disconnectReplicators().thenAccept(__ -> { + brokerService.removeTopicFromCache(PersistentTopic.this); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - unregisterTopicPolicyListener(); + unregisterTopicPolicyListener(); - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); + }); } @Override @@ -1565,8 +1565,6 @@ public CompletableFuture close( List> futures = new ArrayList<>(); futures.add(transactionBuffer.closeAsync()); - replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -1614,22 +1612,26 @@ public CompletableFuture close( ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - if (disconnectClients) { - // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + disconnectReplicators().thenAccept(__ -> { + if (disconnectClients) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } + }); } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - if (disconnectClients) { - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } + disconnectReplicators().thenAccept(__ -> { + if (disconnectClients) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } + }); } }, null); }).exceptionally(exception -> { @@ -1642,6 +1644,18 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } + private CompletableFuture disconnectReplicators() { + List> futures = new ArrayList<>(); + replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + return FutureUtil.waitForAll(futures).handle((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to close replicator, proceeding anyway.", topic, ex); + } + return null; + }); + } + private void disposeTopic(CompletableFuture closeFuture) { brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 181721e34aa73..8efb5ec6736b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -23,6 +23,7 @@ import java.net.URL; import java.time.Duration; import java.util.Collections; +import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; @@ -178,6 +179,19 @@ protected void waitChangeEventsInit(String namespace) { }); } + protected void cleanupNamespace(String namespace) throws Exception { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1)); + List partitionedTopics = admin1.topics().getPartitionedTopicList(namespace); + for (String partitionedTopic: partitionedTopics) { + admin1.topics().deletePartitionedTopic(partitionedTopic); + } + List nonPartitionedTopics = admin1.topics().getList(namespace); + for (String nonPartitionedTopic: nonPartitionedTopics) { + admin1.topics().delete(nonPartitionedTopic); + } + admin1.namespaces().deleteNamespace(namespace); + } + protected interface CleanupTopicAction { void run() throws Exception; } From c216c09d3002fadd39356559168641b3f894bc5c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 13:07:48 +0800 Subject: [PATCH 2/7] make up the missing part in Part-1 --- .../broker/service/AbstractReplicator.java | 6 +++++- .../service/persistent/PersistentReplicator.java | 5 +++++ .../service/persistent/PersistentTopic.java | 16 +++++++++------- .../broker/service/OneWayReplicatorTestBase.java | 14 -------------- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index f34144deb0ab0..d7d820c37249a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -248,7 +248,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { } startProducer(); }).exceptionally(ex -> { - log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + log.error("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + " trigger a terminate. Replicator state: {}", localTopicName, replicatorId, STATE_UPDATER.get(this), ex); terminate(); @@ -377,9 +377,13 @@ public CompletableFuture terminate() { this.producer = null; // set the cursor as inactive. disableReplicatorRead(); + // release resources. + doReleaseResources(); }); } + protected void doReleaseResources() {} + protected boolean tryChangeStatusToTerminating() { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index c793a7df04ba4..367d19652072d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -698,6 +698,11 @@ public boolean isConnected() { return producer != null && producer.isConnected(); } + @Override + protected void doReleaseResources() { + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + } + private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class); @VisibleForTesting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3e5d1b98e4163..2eaff891817cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1466,7 +1466,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - disconnectReplicators().thenAccept(__ -> { + terminateReplicators().thenAccept(__ -> { brokerService.removeTopicFromCache(PersistentTopic.this); dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); @@ -1488,7 +1488,9 @@ public void deleteLedgerComplete(Object ctx) { instanceof MetadataStoreException.NotFoundException) { log.info("[{}] Topic is already deleted {}", topic, exception.getMessage()); - deleteLedgerComplete(ctx); + terminateReplicators().thenAccept(__ -> { + deleteLedgerComplete(ctx); + }); } else { log.error("[{}] Error deleting topic", topic, exception); @@ -1612,7 +1614,7 @@ public CompletableFuture close( ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - disconnectReplicators().thenAccept(__ -> { + terminateReplicators().thenAccept(__ -> { if (disconnectClients) { // Everything is now closed, remove the topic from map disposeTopic(closeFuture); @@ -1625,7 +1627,7 @@ public void closeComplete(Object ctx) { @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - disconnectReplicators().thenAccept(__ -> { + terminateReplicators().thenAccept(__ -> { if (disconnectClients) { disposeTopic(closeFuture); } else { @@ -1644,10 +1646,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } - private CompletableFuture disconnectReplicators() { + private CompletableFuture terminateReplicators() { List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); return FutureUtil.waitForAll(futures).handle((__, ex) -> { if (ex != null) { log.error("[{}] Failed to close replicator, proceeding anyway.", topic, ex); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 8efb5ec6736b5..181721e34aa73 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -23,7 +23,6 @@ import java.net.URL; import java.time.Duration; import java.util.Collections; -import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; @@ -179,19 +178,6 @@ protected void waitChangeEventsInit(String namespace) { }); } - protected void cleanupNamespace(String namespace) throws Exception { - admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1)); - List partitionedTopics = admin1.topics().getPartitionedTopicList(namespace); - for (String partitionedTopic: partitionedTopics) { - admin1.topics().deletePartitionedTopic(partitionedTopic); - } - List nonPartitionedTopics = admin1.topics().getList(namespace); - for (String nonPartitionedTopic: nonPartitionedTopics) { - admin1.topics().delete(nonPartitionedTopic); - } - admin1.namespaces().deleteNamespace(namespace); - } - protected interface CleanupTopicAction { void run() throws Exception; } From d3b00d2675d9032d495e27a901536e0baf310227 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 15:23:46 +0800 Subject: [PATCH 3/7] fix test --- .../broker/service/OneWayReplicatorTest.java | 113 ++++++++++++++++++ .../service/OneWayReplicatorTestBase.java | 14 ++- 2 files changed, 122 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 35073575f34ed..95e3ede8c4211 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -26,12 +26,14 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Sets; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -486,4 +488,115 @@ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throw admin1.topics().deletePartitionedTopic(topicName); admin2.topics().deletePartitionedTopic(topicName); } + + /** + * See the description and execution flow: https://github.com/apache/pulsar/pull/21948. + * Steps: + * 1.Create topic, does not enable replication now. + * - The topic will be loaded in the memory. + * 2.Enable namespace level replication. + * - Broker creates a replicator, and the internal producer of replicator is starting. + * - We inject an error to make the internal producer fail to connect,after few seconds, it will retry to start. + * 3.Unload bundle. + * - Starting to close the topic. + * - The replicator will be closed, but it will not close the internal producer, because the producer has not + * been created successfully. + * - We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck. So the topic is still + * in the process of being closed now. + * 4.Internal producer retry to connect. + * - At the next retry, it connected successful. Since the state of "repl.cursor" is not "Closed", this producer + * will not be closed now. + * 5.Topic closed. + * - Cancel the stuck of closing the "repl.cursor". + * - The topic is wholly closed. + * 6.Verify: the delayed created internal producer will be closed. In other words, there is no producer is connected + * to the remote cluster. + */ + @Test + public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception { + final String namespaceName = defaultTenant + "/" + UUID.randomUUID().toString().replaceAll("-", ""); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespaceName + "/tp_"); + // 1.Create topic, does not enable replication now. + admin1.namespaces().createNamespace(namespaceName); + admin2.namespaces().createNamespace(namespaceName); + admin1.topics().createNonPartitionedTopic(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + + // We inject an error to make the internal producer fail to connect. + // The delay time of next retry to create producer is below: + // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s... + // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. + final AtomicInteger createProducerCounter = new AtomicInteger(); + final int failTimes = 6; + injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + if (topicName.equals(producerCnf.getTopicName())) { + // There is a switch to determine create producer successfully or not. + if (createProducerCounter.incrementAndGet() > failTimes) { + return originalProducer; + } + log.info("Retry create replicator.producer count: {}", createProducerCounter); + // Release producer and fail callback. + originalProducer.closeAsync(); + throw new RuntimeException("mock error"); + } + return originalProducer; + }); + + // 2.Enable namespace level replication. + admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1, cluster2)); + AtomicReference replicator = new AtomicReference(); + Awaitility.await().untilAsserted(() -> { + assertFalse(persistentTopic.getReplicators().isEmpty()); + replicator.set( + (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next()); + // Since we inject a producer creation error, the replicator can not start successfully. + assertFalse(replicator.get().isConnected()); + }); + + // We inject a sleeping into the progress of closing the "repl.cursor" to make it stuck, until the internal + // producer of the replicator started. + SpyCursor spyCursor = + spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName()); + CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor); + + // 3.Unload bundle: call "topic.close(false)". + // Stuck start new producer, until the state of replicator change to Stopped. + // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. + Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + assertTrue(createProducerCounter.get() >= failTimes); + }); + CompletableFuture topicCloseFuture = persistentTopic.close(true); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + String state = String.valueOf(replicator.get().getState()); + log.error("replicator state: {}", state); + assertTrue(state.equals("Disconnected") || state.equals("Terminated")); + }); + + // 5.Delay close cursor, until "replicator.producer" create successfully. + // The next once retry time of create "replicator.producer" will be 3.2s. + Thread.sleep(4 * 1000); + log.info("Replicator.state: {}", replicator.get().getState()); + cursorCloseSignal.startClose(); + cursorCloseSignal.startCallback(); + // Wait for topic close successfully. + topicCloseFuture.join(); + + // 6. Verify there is no orphan producer on the remote cluster. + Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertEquals(persistentTopic2.getProducers().size(), 0); + Assert.assertFalse(replicator.get().isConnected()); + }); + + // cleanup. + cleanupTopics(namespaceName, () -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + admin1.namespaces().setNamespaceReplicationClusters(namespaceName, Sets.newHashSet(cluster1)); + admin1.namespaces().deleteNamespace(namespaceName); + admin2.namespaces().deleteNamespace(namespaceName); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 181721e34aa73..95f976f965a0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -150,12 +150,16 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { } protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception { - waitChangeEventsInit(replicatedNamespace); - admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Collections.singleton(cluster1)); - admin1.namespaces().unload(replicatedNamespace); + cleanupTopics(replicatedNamespace, cleanupTopicAction); + } + + protected void cleanupTopics(String namespace, CleanupTopicAction cleanupTopicAction) throws Exception { + waitChangeEventsInit(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Collections.singleton(cluster1)); + admin1.namespaces().unload(namespace); cleanupTopicAction.run(); - admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace, Sets.newHashSet(cluster1, cluster2)); - waitChangeEventsInit(replicatedNamespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + waitChangeEventsInit(namespace); } protected void waitChangeEventsInit(String namespace) { From 0ee09e14d22b811888b90e9d495afb996ff1daf0 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 16:26:35 +0800 Subject: [PATCH 4/7] use new solution --- .../broker/service/AbstractReplicator.java | 4 + .../pulsar/broker/service/Replicator.java | 2 + .../service/persistent/PersistentTopic.java | 118 ++++++++++-------- 3 files changed, 73 insertions(+), 51 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d7d820c37249a..546add6658c60 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -472,4 +472,8 @@ protected ImmutablePair compareSetAndGetState(State expect, Stat } return compareSetAndGetState(expect, update); } + + public boolean isTerminated() { + return state == State.Terminating || state == State.Terminating; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 8130b855b4e4a..5c314397da80e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -51,4 +51,6 @@ default Optional getRateLimiter() { boolean isConnected(); long getNumberOfEntriesInBacklog(); + + boolean isTerminated(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2eaff891817cd..c1a75d67e3c4e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1423,6 +1423,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, List> futures = new ArrayList<>(); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); if (closeIfClientsConnected) { + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); } FutureUtil.waitForAll(futures).thenRunAsync(() -> { @@ -1466,18 +1468,16 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { @Override public void deleteLedgerComplete(Object ctx) { - terminateReplicators().thenAccept(__ -> { - brokerService.removeTopicFromCache(PersistentTopic.this); + brokerService.removeTopicFromCache(PersistentTopic.this); - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); + dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); + subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close); - unregisterTopicPolicyListener(); + unregisterTopicPolicyListener(); - log.info("[{}] Topic deleted", topic); - deleteFuture.complete(null); - }); + log.info("[{}] Topic deleted", topic); + deleteFuture.complete(null); } @Override @@ -1488,9 +1488,7 @@ public void deleteLedgerComplete(Object ctx) { instanceof MetadataStoreException.NotFoundException) { log.info("[{}] Topic is already deleted {}", topic, exception.getMessage()); - terminateReplicators().thenAccept(__ -> { - deleteLedgerComplete(ctx); - }); + deleteLedgerComplete(ctx); } else { log.error("[{}] Error deleting topic", topic, exception); @@ -1567,6 +1565,8 @@ public CompletableFuture close( List> futures = new ArrayList<>(); futures.add(transactionBuffer.closeAsync()); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -1614,26 +1614,22 @@ public CompletableFuture close( ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - terminateReplicators().thenAccept(__ -> { - if (disconnectClients) { - // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } - }); + if (disconnectClients) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - terminateReplicators().thenAccept(__ -> { - if (disconnectClients) { - disposeTopic(closeFuture); - } else { - closeFuture.complete(null); - } - }); + if (disconnectClients) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } } }, null); }).exceptionally(exception -> { @@ -1646,18 +1642,6 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } - private CompletableFuture terminateReplicators() { - List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); - return FutureUtil.waitForAll(futures).handle((__, ex) -> { - if (ex != null) { - log.error("[{}] Failed to close replicator, proceeding anyway.", topic, ex); - } - return null; - }); - } - private void disposeTopic(CompletableFuture closeFuture) { brokerService.removeTopicFromCache(PersistentTopic.this) .thenRun(() -> { @@ -1747,6 +1731,7 @@ public CompletableFuture checkReplication() { return deleteForcefully(); } + removeTerminatedReplicators(replicators); List> futures = new ArrayList<>(); // Check for missing replicators @@ -1785,6 +1770,8 @@ private CompletableFuture checkShadowReplication() { if (log.isDebugEnabled()) { log.debug("[{}] Checking shadow replication status, shadowTopics={}", topic, configuredShadowTopics); } + + removeTerminatedReplicators(shadowReplicators); List> futures = new ArrayList<>(); // Check for missing replicators @@ -1935,19 +1922,30 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma if (replicationClient == null) { return; } - Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { - try { - return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, - remoteCluster, brokerService, (PulsarClientImpl) replicationClient); - } catch (PulsarServerException e) { - log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + lock.readLock().lock(); + try { + if (isClosingOrDeleting) { + // Whether is "transferring" or not, do not create new replicator. + log.info("[{}] Skip to create replicator because this topic is closing." + + " remote cluster: {}. State of transferring : {}", + topic, remoteCluster, transferring); + return; } - return null; - }); - - // clean up replicator if startup is failed - if (replicator == null) { - replicators.removeNullValue(remoteCluster); + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { + try { + return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, + remoteCluster, brokerService, (PulsarClientImpl) replicationClient); + } catch (PulsarServerException e) { + log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); + } + return null; + }); + // clean up replicator if startup is failed + if (replicator == null) { + replicators.removeNullValue(remoteCluster); + } + } finally { + lock.readLock().unlock(); } }); } @@ -3897,9 +3895,27 @@ private void fenceTopicToCloseOrDelete() { } private void unfenceTopicToResume() { - subscriptions.values().forEach(sub -> sub.resumeAfterFence()); isFenced = false; isClosingOrDeleting = false; + subscriptions.values().forEach(sub -> sub.resumeAfterFence()); + unfenceReplicatorsToResume(); + } + + private void unfenceReplicatorsToResume() { + checkReplication(); + checkShadowReplication(); + } + + private void removeTerminatedReplicators(ConcurrentOpenHashMap replicators) { + Map terminatedReplicators = new HashMap<>(); + replicators.forEach((cluster, replicator) -> { + if (replicator.isTerminated()) { + terminatedReplicators.put(cluster, replicator); + } + }); + terminatedReplicators.entrySet().forEach(entry -> { + replicators.remove(entry.getKey(), entry.getValue()); + }); } @Override From 211f8b599b5f6c7890849f85ad1500dd8cde85d7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 18:08:53 +0800 Subject: [PATCH 5/7] add test: testUnFenceTopicToReuse --- .../broker/service/AbstractReplicator.java | 2 +- .../broker/service/OneWayReplicatorTest.java | 55 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 546add6658c60..394fad21ae6dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -474,6 +474,6 @@ protected ImmutablePair compareSetAndGetState(State expect, Stat } public boolean isTerminated() { - return state == State.Terminating || state == State.Terminating; + return state == State.Terminating || state == State.Terminated; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 95e3ede8c4211..f008a9cdad20c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -50,6 +51,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; @@ -69,6 +71,8 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -599,4 +603,55 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception admin1.namespaces().deleteNamespace(namespaceName); admin2.namespaces().deleteNamespace(namespaceName); } + + @Test + public void testUnFenceTopicToReuse() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp"); + // Wait for replicator started. + Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); + waitReplicatorStarted(topicName); + + // Inject an error to make topic close fails. + final String mockProducerName = UUID.randomUUID().toString(); + final org.apache.pulsar.broker.service.Producer mockProducer = + mock(org.apache.pulsar.broker.service.Producer.class); + doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) + .when(mockProducer).disconnect(any()); + doAnswer(invocation -> CompletableFuture.failedFuture(new RuntimeException("mocked error"))) + .when(mockProducer).disconnect(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + persistentTopic.getProducers().put(mockProducerName, mockProducer); + + // Do close. + GeoPersistentReplicator replicator1 = + (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + try { + persistentTopic.close(true, false).join(); + fail("Expected close fails due to a producer close fails"); + } catch (Exception ex) { + log.info("Expected error: {}", ex.getMessage()); + } + + // Broker will call `topic.unfenceTopicToResume` if close clients fails. + // Verify: the replicator will be re-created. + Awaitility.await().untilAsserted(() -> { + assertTrue(producer1.isConnected()); + GeoPersistentReplicator replicator2 = + (GeoPersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + assertNotEquals(replicator1, replicator2); + assertFalse(replicator1.isConnected()); + assertFalse(replicator1.producer != null && replicator1.producer.isConnected()); + assertTrue(replicator2.isConnected()); + assertTrue(replicator2.producer != null && replicator2.producer.isConnected()); + }); + + // cleanup. + persistentTopic.getProducers().remove(mockProducerName, mockProducer); + producer1.close(); + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } } From 5735988d2cd6a1acd75d3bab4c4d49cbf012958b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 19:30:03 +0800 Subject: [PATCH 6/7] fix checkstyle --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index f008a9cdad20c..9b8b567af081b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -71,8 +71,6 @@ import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; From 3d389b1a67efa35e98bbae275d96652445b50e9f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 24 Apr 2024 23:31:32 +0800 Subject: [PATCH 7/7] fix tests --- .../java/org/apache/pulsar/broker/service/ReplicatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index a05c3468ea16e..0bfcdf563d632 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -152,7 +152,7 @@ public Object[][] partitionedTopicProvider() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - @Test + @Test(priority = Integer.MAX_VALUE) public void testConfigChange() throws Exception { log.info("--- Starting ReplicatorTest::testConfigChange ---"); // This test is to verify that the config change on global namespace is successfully applied in broker during