From 378f07b23c883050d19fc1641807f8462a8ded25 Mon Sep 17 00:00:00 2001 From: hanmz Date: Tue, 19 Sep 2023 14:18:45 +0800 Subject: [PATCH 1/8] [fix][broker] release orphan replicator after replicator is disconnected. --- .../broker/service/AbstractReplicator.java | 11 ++++ .../service/AbstractReplicatorTest.java | 62 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 6dd296d16b53b..98532901f743a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; @@ -62,6 +63,8 @@ public abstract class AbstractReplicator { protected final String replicatorPrefix; + protected final AtomicBoolean isDisconnected = new AtomicBoolean(false); + protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); private volatile State state = State.Stopped; @@ -116,6 +119,11 @@ public String getRemoteCluster() { // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { + if (isDisconnected.get()) { + log.info("[{}] Do not start replicator because of replicator is disconnected.", replicatorId); + return; + } + if (STATE_UPDATER.get(this) == State.Stopping) { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { @@ -227,6 +235,9 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } + log.info("[{}] Set replicator is disconnected.", replicatorId); + isDisconnected.set(true); + if (STATE_UPDATER.get(this) == State.Stopping) { // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by // closeProducerAsync() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index f8034c37971cc..762d8d86e4896 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -112,6 +112,68 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { }); } + @Test + public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Exception { + final String localCluster = "localCluster"; + final String remoteCluster = "remoteCluster"; + final String topicName = "remoteTopicName"; + final String replicatorPrefix = "pulsar.repl"; + final DefaultEventLoop eventLoopGroup = new DefaultEventLoop(); + // Mock services. + final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class); + final PulsarService pulsar = mock(PulsarService.class); + final BrokerService broker = mock(BrokerService.class); + final Topic localTopic = mock(Topic.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + final PulsarClientImpl localClient = mock(PulsarClientImpl.class); + when(localClient.getCnxPool()).thenReturn(connectionPool); + final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); + when(remoteClient.getCnxPool()).thenReturn(connectionPool); + final ProducerBuilder producerBuilder = mock(ProducerBuilder.class); + final ConcurrentOpenHashMap>> topics = new ConcurrentOpenHashMap<>(); + topics.put(topicName, CompletableFuture.completedFuture(Optional.of(localTopic))); + when(broker.executor()).thenReturn(eventLoopGroup); + when(broker.getTopics()).thenReturn(topics); + when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder); + when(broker.pulsar()).thenReturn(pulsar); + when(pulsar.getClient()).thenReturn(localClient); + when(pulsar.getConfiguration()).thenReturn(pulsarConfig); + when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100); + when(localTopic.getName()).thenReturn(topicName); + when(producerBuilder.topic(any())).thenReturn(producerBuilder); + when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder); + when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder); + when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder); + when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder); + when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder); + // Mock create producer fail. + when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex")); + when(producerBuilder.createAsync()) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex"))); + // Make race condition: "retry start producer" and "close replicator". + final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, + replicatorPrefix, broker, remoteClient); + replicator.startProducer(); + replicator.disconnect(); + + // Verify task will done. + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + AtomicInteger taskCounter = new AtomicInteger(); + CountDownLatch checkTaskFinished = new CountDownLatch(1); + eventLoopGroup.execute(() -> { + synchronized (replicator) { + LinkedBlockingQueue taskQueue = WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue"); + DefaultPriorityQueue scheduledTaskQueue = + WhiteboxImpl.getInternalState(eventLoopGroup, "scheduledTaskQueue"); + taskCounter.set(taskQueue.size() + scheduledTaskQueue.size()); + checkTaskFinished.countDown(); + } + }); + checkTaskFinished.await(); + Assert.assertEquals(taskCounter.get(), 0); + }); + } + private static class ReplicatorInTest extends AbstractReplicator { public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, From 8666dcf47296e89d1b7cb23adc34340244232bef Mon Sep 17 00:00:00 2001 From: hanmz Date: Tue, 19 Sep 2023 18:52:19 +0800 Subject: [PATCH 2/8] [fix][broker] release orphan replicator after replicator is closed. --- .../broker/service/AbstractReplicator.java | 16 ++++++++-------- .../apache/pulsar/broker/service/Replicator.java | 3 +++ .../service/persistent/PersistentTopic.java | 10 ++++++++-- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 98532901f743a..03e23cfee5dad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -21,7 +21,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; @@ -63,8 +62,6 @@ public abstract class AbstractReplicator { protected final String replicatorPrefix; - protected final AtomicBoolean isDisconnected = new AtomicBoolean(false); - protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); private volatile State state = State.Stopped; @@ -73,6 +70,8 @@ protected enum State { Stopped, Starting, Started, Stopping } + private volatile boolean isClosed = false; + public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { @@ -119,8 +118,8 @@ public String getRemoteCluster() { // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { - if (isDisconnected.get()) { - log.info("[{}] Do not start replicator because of replicator is disconnected.", replicatorId); + if (isClosed) { + log.info("[{}] Do not start replicator because of replicator is already closed.", replicatorId); return; } @@ -235,9 +234,6 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } - log.info("[{}] Set replicator is disconnected.", replicatorId); - isDisconnected.set(true); - if (STATE_UPDATER.get(this) == State.Stopping) { // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by // closeProducerAsync() @@ -254,6 +250,10 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return closeProducerAsync(); } + public void close() { + isClosed = true; + } + public CompletableFuture remove() { // No-op return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 482fa2cbd2300..d09dbfa0b72f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -51,4 +51,7 @@ default Optional getRateLimiter() { boolean isConnected(); long getNumberOfEntriesInBacklog(); + + default void close() { + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index dfeb03a254698..0ae496d364cec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1826,7 +1826,10 @@ CompletableFuture removeReplicator(String remoteCluster) { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { - replicators.remove(remoteCluster); + Replicator replicator = replicators.remove(remoteCluster); + if (replicator != null) { + replicator.close(); + } future.complete(null); } @@ -1898,7 +1901,10 @@ CompletableFuture removeShadowReplicator(String shadowTopic) { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { - shadowReplicators.remove(shadowTopic); + Replicator replicator = shadowReplicators.remove(shadowTopic); + if (replicator != null) { + replicator.close(); + } future.complete(null); } From 837b630825c42eaa3686eb0134df278e43ee7603 Mon Sep 17 00:00:00 2001 From: hanmz Date: Tue, 19 Sep 2023 18:54:42 +0800 Subject: [PATCH 3/8] [fix][broker] release orphan replicator after replicator is closed. --- .../apache/pulsar/broker/service/AbstractReplicatorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 762d8d86e4896..3b077233549e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -155,6 +155,8 @@ public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Excepti replicatorPrefix, broker, remoteClient); replicator.startProducer(); replicator.disconnect(); + // Mock close replicator. + replicator.close(); // Verify task will done. Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { From 3864737d1c8283343c66d2f2f982173db42d7906 Mon Sep 17 00:00:00 2001 From: hanmz Date: Wed, 20 Sep 2023 11:46:54 +0800 Subject: [PATCH 4/8] [fix][broker] release orphan replicator after replicator is closed. --- .../pulsar/broker/service/AbstractReplicator.java | 3 ++- .../apache/pulsar/broker/service/Replicator.java | 3 ++- .../broker/service/persistent/PersistentTopic.java | 14 ++++---------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 03e23cfee5dad..716009873928a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -250,8 +250,9 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return closeProducerAsync(); } - public void close() { + public CompletableFuture close() { isClosed = true; + return disconnect(); } public CompletableFuture remove() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index d09dbfa0b72f3..663c103f3b0ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -52,6 +52,7 @@ default Optional getRateLimiter() { long getNumberOfEntriesInBacklog(); - default void close() { + default CompletableFuture close() { + return CompletableFuture.completedFuture(null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0ae496d364cec..d5aa1e5d2f405 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1821,15 +1821,12 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) + Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::close) .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { - Replicator replicator = replicators.remove(remoteCluster); - if (replicator != null) { - replicator.close(); - } + replicators.remove(remoteCluster); future.complete(null); } @@ -1896,15 +1893,12 @@ CompletableFuture removeShadowReplicator(String shadowTopic) { log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic); final CompletableFuture future = new CompletableFuture<>(); String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic); - shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> { + shadowReplicators.get(shadowTopic).close().thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override public void deleteCursorComplete(Object ctx) { - Replicator replicator = shadowReplicators.remove(shadowTopic); - if (replicator != null) { - replicator.close(); - } + shadowReplicators.remove(shadowTopic); future.complete(null); } From bba2d6c22794b0ffc6b39cad57d7c06eb874f2fc Mon Sep 17 00:00:00 2001 From: hanmz Date: Wed, 20 Sep 2023 12:17:10 +0800 Subject: [PATCH 5/8] [fix][broker] release orphan replicator after replicator is closed. --- .../apache/pulsar/broker/service/AbstractReplicatorTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 3b077233549e7..88529b3709fd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -154,8 +154,7 @@ public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Excepti final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, replicatorPrefix, broker, remoteClient); replicator.startProducer(); - replicator.disconnect(); - // Mock close replicator. + // Close replicator. replicator.close(); // Verify task will done. From eea28cac3737cddcaf35f4a7847794bb56982812 Mon Sep 17 00:00:00 2001 From: hanmz Date: Fri, 22 Sep 2023 14:51:12 +0800 Subject: [PATCH 6/8] [fix][broker] release orphan replicator after replicator is closed. --- .../broker/service/AbstractReplicator.java | 22 ++++++++++--------- .../service/persistent/PersistentTopic.java | 4 ++-- .../service/AbstractReplicatorTest.java | 3 +-- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 716009873928a..2b3d3fb025bde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -118,11 +118,11 @@ public String getRemoteCluster() { // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { - if (isClosed) { - log.info("[{}] Do not start replicator because of replicator is already closed.", replicatorId); - return; - } + isClosed = false; + startProducerInternal(); + } + public synchronized void startProducerInternal() { if (STATE_UPDATER.get(this) == State.Stopping) { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { @@ -171,9 +171,14 @@ public synchronized void startProducer() { } protected void checkTopicActiveAndRetryStartProducer() { + // if replicator is closed do not retry start producer + if (isClosed) { + log.info("[{}] Do not retry start replicator because of replicator is already closed.", replicatorId); + return; + } isLocalTopicActive().thenAccept(isTopicActive -> { if (isTopicActive) { - startProducer(); + startProducerInternal(); } }).exceptionally(ex -> { log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, @@ -234,6 +239,8 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } + isClosed = true; + if (STATE_UPDATER.get(this) == State.Stopping) { // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by // closeProducerAsync() @@ -250,11 +257,6 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return closeProducerAsync(); } - public CompletableFuture close() { - isClosed = true; - return disconnect(); - } - public CompletableFuture remove() { // No-op return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d5aa1e5d2f405..dfeb03a254698 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1821,7 +1821,7 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::close) + Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override @@ -1893,7 +1893,7 @@ CompletableFuture removeShadowReplicator(String shadowTopic) { log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic); final CompletableFuture future = new CompletableFuture<>(); String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic); - shadowReplicators.get(shadowTopic).close().thenRun(() -> { + shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 88529b3709fd8..762d8d86e4896 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -154,8 +154,7 @@ public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Excepti final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, replicatorPrefix, broker, remoteClient); replicator.startProducer(); - // Close replicator. - replicator.close(); + replicator.disconnect(); // Verify task will done. Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { From bd85bbaf17384f67d4e2ab9d25509df89d23c254 Mon Sep 17 00:00:00 2001 From: hanmz Date: Fri, 22 Sep 2023 14:52:14 +0800 Subject: [PATCH 7/8] [fix][broker] release orphan replicator after replicator is closed. --- .../java/org/apache/pulsar/broker/service/Replicator.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 663c103f3b0ef..482fa2cbd2300 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -51,8 +51,4 @@ default Optional getRateLimiter() { boolean isConnected(); long getNumberOfEntriesInBacklog(); - - default CompletableFuture close() { - return CompletableFuture.completedFuture(null); - } } From 8686379bc7e5fd0aad8a6d29052ca1634105d998 Mon Sep 17 00:00:00 2001 From: hanmz Date: Fri, 22 Sep 2023 14:58:13 +0800 Subject: [PATCH 8/8] [fix][broker] release orphan replicator after replicator is closed. --- .../org/apache/pulsar/broker/service/AbstractReplicator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 2b3d3fb025bde..28226be496859 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -118,6 +118,8 @@ public String getRemoteCluster() { // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { + // This method comes from some actives call and may be call again after disconnect + // so here we will first mark isClosed is false isClosed = false; startProducerInternal(); }