Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

private volatile boolean isClosed = false;

public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
Expand Down Expand Up @@ -116,6 +118,13 @@ public String getRemoteCluster() {
// This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer
// the end result can be disconnect.
public synchronized void startProducer() {
// This method comes from some actives call and may be call again after disconnect
// so here we will first mark isClosed is false
isClosed = false;
startProducerInternal();
}

public synchronized void startProducerInternal() {
if (STATE_UPDATER.get(this) == State.Stopping) {
long waitTimeMs = backOff.next();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -164,9 +173,14 @@ public synchronized void startProducer() {
}

protected void checkTopicActiveAndRetryStartProducer() {
// if replicator is closed do not retry start producer
if (isClosed) {
log.info("[{}] Do not retry start replicator because of replicator is already closed.", replicatorId);
return;
}
isLocalTopicActive().thenAccept(isTopicActive -> {
if (isTopicActive) {
startProducer();
startProducerInternal();
}
}).exceptionally(ex -> {
log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId,
Expand Down Expand Up @@ -227,6 +241,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return disconnectFuture;
}

isClosed = true;

if (STATE_UPDATER.get(this) == State.Stopping) {
// Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by
// closeProducerAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,68 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
});
}

@Test
public void testExitRetryStartProducerAfterReplicatorDisconnect() throws Exception {
final String localCluster = "localCluster";
final String remoteCluster = "remoteCluster";
final String topicName = "remoteTopicName";
final String replicatorPrefix = "pulsar.repl";
final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
// Mock services.
final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class);
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
ConnectionPool connectionPool = mock(ConnectionPool.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
when(localClient.getCnxPool()).thenReturn(connectionPool);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
when(remoteClient.getCnxPool()).thenReturn(connectionPool);
final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
topics.put(topicName, CompletableFuture.completedFuture(Optional.of(localTopic)));
when(broker.executor()).thenReturn(eventLoopGroup);
when(broker.getTopics()).thenReturn(topics);
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
when(broker.pulsar()).thenReturn(pulsar);
when(pulsar.getClient()).thenReturn(localClient);
when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
when(localTopic.getName()).thenReturn(topicName);
when(producerBuilder.topic(any())).thenReturn(producerBuilder);
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
// Mock create producer fail.
when(producerBuilder.create()).thenThrow(new RuntimeException("mocked ex"));
when(producerBuilder.createAsync())
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex")));
// Make race condition: "retry start producer" and "close replicator".
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
replicatorPrefix, broker, remoteClient);
replicator.startProducer();
replicator.disconnect();

// Verify task will done.
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
AtomicInteger taskCounter = new AtomicInteger();
CountDownLatch checkTaskFinished = new CountDownLatch(1);
eventLoopGroup.execute(() -> {
synchronized (replicator) {
LinkedBlockingQueue taskQueue = WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue");
DefaultPriorityQueue scheduledTaskQueue =
WhiteboxImpl.getInternalState(eventLoopGroup, "scheduledTaskQueue");
taskCounter.set(taskQueue.size() + scheduledTaskQueue.size());
checkTaskFinished.countDown();
}
});
checkTaskFinished.await();
Assert.assertEquals(taskCounter.get(), 0);
});
}

private static class ReplicatorInTest extends AbstractReplicator {

public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
Expand Down