diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 6dd296d16b53b..28226be496859 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -70,6 +70,8 @@ protected enum State { Stopped, Starting, Started, Stopping } + private volatile boolean isClosed = false; + public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { @@ -116,6 +118,13 @@ public String getRemoteCluster() { // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer // the end result can be disconnect. public synchronized void startProducer() { + // This method comes from some actives call and may be call again after disconnect + // so here we will first mark isClosed is false + isClosed = false; + startProducerInternal(); + } + + public synchronized void startProducerInternal() { if (STATE_UPDATER.get(this) == State.Stopping) { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { @@ -164,9 +173,14 @@ public synchronized void startProducer() { } protected void checkTopicActiveAndRetryStartProducer() { + // if replicator is closed do not retry start producer + if (isClosed) { + log.info("[{}] Do not retry start replicator because of replicator is already closed.", replicatorId); + return; + } isLocalTopicActive().thenAccept(isTopicActive -> { if (isTopicActive) { - startProducer(); + startProducerInternal(); } }).exceptionally(ex -> { log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, @@ -227,6 +241,8 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } + isClosed = true; + if (STATE_UPDATER.get(this) == State.Stopping) { // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by // closeProducerAsync() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index f8034c37971cc..762d8d86e4896 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -112,6 +112,68 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { }); } + @Test + public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Exception { + final String localCluster = "localCluster"; + final String remoteCluster = "remoteCluster"; + final String topicName = "remoteTopicName"; + final String replicatorPrefix = "pulsar.repl"; + final DefaultEventLoop eventLoopGroup = new DefaultEventLoop(); + // Mock services. + final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class); + final PulsarService pulsar = mock(PulsarService.class); + final BrokerService broker = mock(BrokerService.class); + final Topic localTopic = mock(Topic.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + final PulsarClientImpl localClient = mock(PulsarClientImpl.class); + when(localClient.getCnxPool()).thenReturn(connectionPool); + final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); + when(remoteClient.getCnxPool()).thenReturn(connectionPool); + final ProducerBuilder producerBuilder = mock(ProducerBuilder.class); + final ConcurrentOpenHashMap>> topics = new ConcurrentOpenHashMap<>(); + topics.put(topicName, CompletableFuture.completedFuture(Optional.of(localTopic))); + when(broker.executor()).thenReturn(eventLoopGroup); + when(broker.getTopics()).thenReturn(topics); + when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder); + when(broker.pulsar()).thenReturn(pulsar); + when(pulsar.getClient()).thenReturn(localClient); + when(pulsar.getConfiguration()).thenReturn(pulsarConfig); + when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100); + when(localTopic.getName()).thenReturn(topicName); + when(producerBuilder.topic(any())).thenReturn(producerBuilder); + when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder); + when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder); + when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder); + when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder); + when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder); + // Mock create producer fail. + when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex")); + when(producerBuilder.createAsync()) + .thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex"))); + // Make race condition: "retry start producer" and "close replicator". + final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, + replicatorPrefix, broker, remoteClient); + replicator.startProducer(); + replicator.disconnect(); + + // Verify task will done. + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + AtomicInteger taskCounter = new AtomicInteger(); + CountDownLatch checkTaskFinished = new CountDownLatch(1); + eventLoopGroup.execute(() -> { + synchronized (replicator) { + LinkedBlockingQueue taskQueue = WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue"); + DefaultPriorityQueue scheduledTaskQueue = + WhiteboxImpl.getInternalState(eventLoopGroup, "scheduledTaskQueue"); + taskCounter.set(taskQueue.size() + scheduledTaskQueue.size()); + checkTaskFinished.countDown(); + } + }); + checkTaskFinished.await(); + Assert.assertEquals(taskCounter.get(), 0); + }); + } + private static class ReplicatorInTest extends AbstractReplicator { public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,