diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1b5b2824257b0..f34144deb0ab0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,16 +18,22 @@ */ package org.apache.pulsar.broker.service; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import lombok.Getter; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.Backoff; @@ -39,7 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractReplicator { +public abstract class AbstractReplicator implements Replicator { protected final BrokerService brokerService; protected final String localTopicName; @@ -64,10 +70,31 @@ public abstract class AbstractReplicator { protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); - private volatile State state = State.Stopped; - - protected enum State { - Stopped, Starting, Started, Stopping + @VisibleForTesting + @Getter + protected volatile State state = State.Disconnected; + + public enum State { + /** + * This enum has two mean meanings: + * Init: replicator is just created, has not been started now. + * Disconnected: the producer was closed after {@link PersistentTopic#checkGC} called {@link #disconnect}. + */ + // The internal producer is disconnected. + Disconnected, + // Trying to create a new internal producer. + Starting, + // The internal producer has started, and tries copy data. + Started, + /** + * The producer is closing after {@link PersistentTopic#checkGC} called {@link #disconnect}. + */ + // The internal producer is trying to disconnect. + Disconnecting, + // The replicator is in terminating. + Terminating, + // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. + Terminated; } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, @@ -96,16 +123,16 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); - protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); + protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer producer); protected abstract Position getReplicatorReadPosition(); - protected abstract long getNumberOfEntriesInBacklog(); + public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); @@ -113,66 +140,121 @@ public String getRemoteCluster() { return remoteCluster; } - // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer - // the end result can be disconnect. - public synchronized void startProducer() { - if (STATE_UPDATER.get(this) == State.Stopping) { - long waitTimeMs = backOff.next(); - if (log.isDebugEnabled()) { - log.debug( - "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", - replicatorId, waitTimeMs / 1000.0); - } - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); - return; - } - State state = STATE_UPDATER.get(this); - if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { - if (state == State.Started) { - // Already running + public void startProducer() { + // Guarantee only one task call "producerBuilder.createAsync()". + Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); + if (!setStartingRes.getLeft()) { + if (setStartingRes.getRight() == State.Starting) { + log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", + replicatorId, state); + } else if (setStartingRes.getRight() == State.Started) { + // Since the method "startProducer" will be called even if it is started, only print debug-level log. + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); + } + } else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { - log.debug("[{}] Replicator was already running", replicatorId); + log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." + + " state: {}", replicatorId, state); } + delayStartProducerAfterDisconnected(); } else { - log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); + /** {@link State.Terminating}, {@link State.Terminated}. **/ + log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { - readEntries(producer); + setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { - log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectedRes.getRight() == State.Terminated) { + log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", + replicatorId, state); + } else { + /** {@link State.Disconnected}, {@link State.Starting}, {@link State.Started} **/ + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other thread will try to create the producer again. so skipped current one task." + + " State is : {}", + replicatorId, state); + } } return null; }); + } + /*** + * The producer is disconnecting, delay to start the producer. + * If we start a producer immediately, we will get a conflict producer(same name producer) registered error. + */ + protected void delayStartProducerAfterDisconnected() { + long waitTimeMs = backOff.next(); + if (log.isDebugEnabled()) { + log.debug( + "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", + replicatorId, waitTimeMs / 1000.0); + } + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } - protected void checkTopicActiveAndRetryStartProducer() { - isLocalTopicActive().thenAccept(isTopicActive -> { - if (isTopicActive) { - startProducer(); + protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { + brokerService.executor().schedule(() -> { + if (state == State.Terminating || state == State.Terminated) { + log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", + replicatorId, state); + return; } - }).exceptionally(ex -> { - log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); - return null; - }); + CompletableFuture> topicFuture = brokerService.getTopics().get(localTopicName); + if (topicFuture == null) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully." + + " And trigger a terminate.", replicatorId); + terminate(); + return; + } + topicFuture.thenAccept(optional -> { + if (optional.isEmpty()) { + // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } + if (optional.get() != localTopic) { + // Topic closed and created a new one, current replicator is outdated. + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); + return; + } + Replicator replicator = localTopic.getReplicators().get(remoteCluster); + if (replicator != AbstractReplicator.this) { + // Current replicator has been closed, and created a new one. + log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current" + + " one. And trigger a terminate.", replicatorId); + terminate(); + return; + } + startProducer(); + }).exceptionally(ex -> { + log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + + " trigger a terminate. Replicator state: {}", + localTopicName, replicatorId, STATE_UPDATER.get(this), ex); + terminate(); + return null; + }); + }, waitTimeMs, TimeUnit.MILLISECONDS); } protected CompletableFuture isLocalTopicActive() { @@ -188,58 +270,130 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } - protected synchronized CompletableFuture closeProducerAsync() { - if (producer == null) { - STATE_UPDATER.set(this, State.Stopped); + /** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ + public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { + long backlog = getNumberOfEntriesInBacklog(); + if (failIfHasBacklog && backlog > 0) { + CompletableFuture disconnectFuture = new CompletableFuture<>(); + disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); + } + return disconnectFuture; + } + log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, + getReplicatorReadPosition(), backlog); + return closeProducerAsync(closeTheStartingProducer); + } + + /** + * This method only be used by {@link PersistentTopic#checkGC} now. + */ + protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { + Pair setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); + if (!setDisconnectingRes.getLeft()) { + if (setDisconnectingRes.getRight() == State.Starting) { + if (closeTheStartingProducer) { + /** + * Delay retry(wait for the start producer task is finish). + * Note: If the producer always start fail, the start producer task will always retry until the + * state changed to {@link State.Terminated}. + * Nit: The better solution is creating a {@link CompletableFuture} to trace the in-progress + * creation and call "inProgressCreationFuture.thenApply(closeProducer())". + */ + long waitTimeMs = backOff.next(); + brokerService.executor().schedule(() -> closeProducerAsync(true), + waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.info("[{}] Skip current producer closing since the previous producer has been closed," + + " and trying start a new one, state : {}", + replicatorId, setDisconnectingRes.getRight()); + } + } else if (setDisconnectingRes.getRight() == State.Disconnected + || setDisconnectingRes.getRight() == State.Disconnecting) { + log.info("[{}] Skip current producer closing since other thread did closing, state : {}", + replicatorId, setDisconnectingRes.getRight()); + } else if (setDisconnectingRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { + log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", + replicatorId, state); + } + log.info("[{}] Skip current termination since other thread is doing close producer or termination," + + " state : {}", replicatorId, state); return CompletableFuture.completedFuture(null); } - CompletableFuture future = producer.closeAsync(); + + // Close producer and update state. + return doCloseProducerAsync(producer, () -> { + Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { + this.producer = null; + // deactivate further read + disableReplicatorRead(); + return; + } + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { + log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", + replicatorId, state); + } else { + // Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other task has change the state to terminated. so skipped current one task." + + " State is : {}", + replicatorId, state); + } + }); + } + + protected CompletableFuture doCloseProducerAsync(Producer producer, Runnable actionAfterClosed) { + CompletableFuture future = + producer == null ? CompletableFuture.completedFuture(null) : producer.closeAsync(); return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Stopped); - this.producer = null; - // deactivate further read - disableReplicatorRead(); + actionAfterClosed.run(); }).exceptionally(ex -> { long waitTimeMs = backOff.next(); log.warn( - "[{}] Exception: '{}' occurred while trying to close the producer." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); + "[{}] Exception: '{}' occurred while trying to close the producer. Replicator state: {}." + + " Retrying again in {} s.", + replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(() -> doCloseProducerAsync(producer, actionAfterClosed), + waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } - - public CompletableFuture disconnect() { - return disconnect(false); + public CompletableFuture terminate() { + if (!tryChangeStatusToTerminating()) { + log.info("[{}] Skip current termination since other thread is doing termination, state : {}", replicatorId, + state); + return CompletableFuture.completedFuture(null); + } + return doCloseProducerAsync(producer, () -> { + STATE_UPDATER.set(this, State.Terminated); + this.producer = null; + // set the cursor as inactive. + disableReplicatorRead(); + }); } - public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { - if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { - CompletableFuture disconnectFuture = new CompletableFuture<>(); - disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); - if (log.isDebugEnabled()) { - log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); - } - return disconnectFuture; + protected boolean tryChangeStatusToTerminating() { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ + return true; } - - if (STATE_UPDATER.get(this) == State.Stopping) { - // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by - // closeProducerAsync() - // which will at some point change the state to stopped - return CompletableFuture.completedFuture(null); + if (STATE_UPDATER.compareAndSet(this, State.Started, State.Terminating)){ + return true; } - - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) - || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) { - log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, - getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); + if (STATE_UPDATER.compareAndSet(this, State.Disconnecting, State.Terminating)){ + return true; } - - return closeProducerAsync(); + if (STATE_UPDATER.compareAndSet(this, State.Disconnected, State.Terminating)) { + return true; + } + return false; } public CompletableFuture remove() { @@ -300,4 +454,18 @@ public static CompletableFuture validatePartitionedTopicAsync(String topic public State getState() { return state; } + + protected ImmutablePair compareSetAndGetState(State expect, State update) { + State original1 = state; + if (STATE_UPDATER.compareAndSet(this, expect, update)) { + return ImmutablePair.of(true, expect); + } + State original2 = state; + // Maybe the value changed more than once even if "original1 == original2", but the probability is very small, + // so let's ignore this case for prevent using a lock. + if (original1 == original2) { + return ImmutablePair.of(false, original1); + } + return compareSetAndGetState(expect, update); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2687532693a45..1212399cf5f5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -742,7 +742,7 @@ public CompletableFuture closeAndRemoveReplicationClient(String clusterNam if (ot.isPresent()) { Replicator r = ot.get().getReplicators().get(clusterName); if (r != null && r.isConnected()) { - r.disconnect(false).whenComplete((v, e) -> f.complete(null)); + r.terminate().whenComplete((v, e) -> f.complete(null)); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 482fa2cbd2300..8130b855b4e4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -29,9 +29,9 @@ public interface Replicator { ReplicatorStatsImpl getStats(); - CompletableFuture disconnect(); + CompletableFuture terminate(); - CompletableFuture disconnect(boolean b); + CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer); void updateRates(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 087c5f932008f..51509f3818a28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -67,7 +67,7 @@ protected String getProducerName() { } @Override - protected void readEntries(Producer producer) { + protected void setProducerAndTriggerReadEntries(Producer producer) { this.producer = (ProducerImpl) producer; if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { @@ -78,8 +78,7 @@ protected void readEntries(Producer producer) { "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); - STATE_UPDATER.set(this, State.Stopping); - closeProducerAsync(); + doCloseProducerAsync(producer, () -> {}); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0ac06d6883ff1..8463b9b0d2f9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -420,7 +420,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c CompletableFuture closeClientFuture = new CompletableFuture<>(); if (closeIfClientsConnected) { List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); FutureUtil.waitForAll(futures).thenRun(() -> { @@ -523,7 +523,7 @@ public CompletableFuture close( List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -582,7 +582,7 @@ public CompletableFuture close( public CompletableFuture stopReplProducers() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } @@ -662,7 +662,7 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - replicators.get(remoteCluster).disconnect().thenRun(() -> { + replicators.get(remoteCluster).terminate().thenRun(() -> { log.info("[{}] Successfully removed replicator {}", name, remoteCluster); replicators.remove(remoteCluster); @@ -1031,7 +1031,7 @@ private CompletableFuture disconnectReplicators() { List> futures = new ArrayList<>(); ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { - futures.add(replicator.disconnect()); + futures.add(replicator.terminate()); }); return FutureUtil.waitForAll(futures); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 754d25b8b0ab4..5e1cc4a936a75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; @@ -26,7 +30,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -43,10 +46,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override - protected void readEntries(Producer producer) { - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting + protected void setProducerAndTriggerReadEntries(Producer producer) { + // Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); - HAVE_PENDING_READ_UPDATER.set(this, FALSE); - this.producer = (ProducerImpl) producer; - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { - log.info("[{}] Created replicator producer", replicatorId); + /** + * 1. Try change state to {@link Started}. + * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value + * producer when the state is {@link Started}. + */ + Pair changeStateRes; + changeStateRes = compareSetAndGetState(Starting, Started); + if (changeStateRes.getLeft()) { + this.producer = (ProducerImpl) producer; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + // Trigger a new read. + log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); backOff.reset(); - // activate cursor: so, entries can be cached + // activate cursor: so, entries can be cached. this.cursor.setActive(); // read entries readMoreEntries(); } else { - log.info( - "[{}] Replicator was stopped while creating the producer." - + " Closing it. Replicator state: {}", - replicatorId, STATE_UPDATER.get(this)); - STATE_UPDATER.set(this, State.Stopping); - closeProducerAsync(); + if (changeStateRes.getRight() == Started) { + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Replicator was already started by another thread while creating the producer." + + " Closing the producer newly created. Replicator state: {}", replicatorId, state); + } else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { + log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", + replicatorId, state); + } else { + log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}", + replicatorId, changeStateRes.getRight()); + } + // Close the producer if change the state fail. + doCloseProducerAsync(producer, () -> {}); } - } @Override @@ -420,8 +437,8 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - if (STATE_UPDATER.get(this) != State.Started) { - log.info("[{}] Replicator was stopped while reading entries." + if (state != Started) { + log.info("[{}] Replicator was disconnected while reading entries." + " Stop reading. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); return; @@ -436,8 +453,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeProducerAsync(); + // replicator is already deleted and cursor is already closed so, producer should also be disconnected. + terminate(); return; } else if (!(exception instanceof TooManyRequestsException)) { log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", @@ -555,8 +572,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { if (exception instanceof CursorAlreadyClosedException) { log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeProducerAsync(); + // replicator is already deleted and cursor is already closed so, producer should also be disconnected. + terminate(); return; } if (ctx instanceof PositionImpl) { @@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } } - @Override - public CompletableFuture disconnect() { - return disconnect(false); - } - - @Override - public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { - final CompletableFuture future = new CompletableFuture<>(); - - super.disconnect(failIfHasBacklog).thenRun(() -> { - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - future.complete(null); - }).exceptionally(ex -> { - Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); - if (!(t instanceof TopicBusyException)) { - log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage()); - } - future.completeExceptionally(t); - return null; - }); - - return future; - } - @Override public boolean isConnected() { ProducerImpl producer = this.producer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e4441969101c1..cc2712d0ca5d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -833,15 +833,15 @@ public CompletableFuture startReplProducers() { public CompletableFuture stopReplProducers() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect())); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } private synchronized CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true))); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true))); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true, true))); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true, true))); return FutureUtil.waitForAll(closeFutures); } @@ -1423,8 +1423,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, List> futures = new ArrayList<>(); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); if (closeIfClientsConnected) { - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); } FutureUtil.waitForAll(futures).thenRunAsync(() -> { @@ -1565,8 +1565,8 @@ public CompletableFuture close( List> futures = new ArrayList<>(); futures.add(transactionBuffer.closeAsync()); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -1941,7 +1941,7 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) + Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate) .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override @@ -2013,7 +2013,7 @@ CompletableFuture removeShadowReplicator(String shadowTopic) { log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic); final CompletableFuture future = new CompletableFuture<>(); String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic); - shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> { + shadowReplicators.get(shadowTopic).terminate().thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override @@ -2897,7 +2897,7 @@ private CompletableFuture checkAndDisconnectReplicators() { ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { if (replicator.getNumberOfEntriesInBacklog() <= 0) { - futures.add(replicator.disconnect()); + futures.add(replicator.terminate()); } }); return FutureUtil.waitForAll(futures); @@ -2948,6 +2948,15 @@ public void checkGC() { log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic, maxInactiveDurationInSec); } + /** + * There is a race condition that may cause a NPE: + * - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication. + * - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable + * "replicator.producer" to a null value. + * Race condition: task 1 will get a NPE when it tries to send messages using the variable + * "replicator.producer", because task 2 will set this variable to "null". + * TODO Create a seperated PR to fix it. + */ closeReplProducersIfNoBacklog().thenRun(() -> { if (hasRemoteProducers()) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 8699c73246830..7aebf20896c2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -94,7 +95,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, replicatorPrefix, broker, remoteClient); replicator.startProducer(); - replicator.disconnect(); + replicator.terminate(); // Verify task will done. Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { @@ -129,7 +130,7 @@ protected String getProducerName() { } @Override - protected void readEntries(Producer producer) { + protected void setProducerAndTriggerReadEntries(Producer producer) { } @@ -139,7 +140,22 @@ protected Position getReplicatorReadPosition() { } @Override - protected long getNumberOfEntriesInBacklog() { + public ReplicatorStatsImpl getStats() { + return null; + } + + @Override + public void updateRates() { + + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public long getNumberOfEntriesInBacklog() { return 0; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 1accd04f4918c..f9184f2288f52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -18,28 +18,56 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; +import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class OneWayReplicatorTest extends OneWayReplicatorTestBase { @@ -78,7 +106,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator return originalValue; } - @Test + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); final String subscribeName = "subscribe_1"; @@ -104,7 +132,7 @@ public void testReplicatorProducerStatInTopic() throws Exception { }); } - @Test + @Test(timeOut = 45 * 1000) public void testCreateRemoteConsumerFirst() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); @@ -124,29 +152,257 @@ public void testCreateRemoteConsumerFirst() throws Exception { }); } - @Test + @Test(timeOut = 45 * 1000) public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topicName); // Wait for replicator started. waitReplicatorStarted(topicName); - PersistentTopic persistentTopic = + PersistentTopic topic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); - PersistentReplicator replicator = - (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + PersistentReplicator replicator1 = + (PersistentReplicator) topic1.getReplicators().values().iterator().next(); // Mock an error when calling "replicator.disconnect()" - ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); - Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); - ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer); + AtomicBoolean closeFailed = new AtomicBoolean(true); + final ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); + final AtomicReference originalProducer1 = new AtomicReference(); + doAnswer(invocation -> { + if (closeFailed.get()) { + return CompletableFuture.failedFuture(new Exception("mocked ex")); + } else { + return originalProducer1.get().closeAsync(); + } + }).when(mockProducer).closeAsync(); + originalProducer1.set(overrideProducerForReplicator(replicator1, mockProducer)); // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be // successful. admin1.topics().unload(topicName); // Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed // successful. - overrideProducerForReplicator(replicator, originalProducer); + closeFailed.set(false); + AtomicReference topic2 = new AtomicReference(); + AtomicReference replicator2 = new AtomicReference(); Awaitility.await().untilAsserted(() -> { + topic2.set((PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get()); + replicator2.set((PersistentReplicator) topic2.get().getReplicators().values().iterator().next()); + // It is a new Topic after reloading. + assertNotEquals(topic2.get(), topic1); + assertNotEquals(replicator2.get(), replicator1); + }); + Awaitility.await().untilAsserted(() -> { + // Old replicator should be closed. + Assert.assertFalse(replicator1.isConnected()); + Assert.assertFalse(originalProducer1.get().isConnected()); + // New replicator should be connected. + Assert.assertTrue(replicator2.get().isConnected()); + }); + // cleanup. + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } + + private void injectMockReplicatorProducerBuilder( + BiFunction producerDecorator) + throws Exception { + String cluster2 = pulsar2.getConfig().getClusterName(); + BrokerService brokerService = pulsar1.getBrokerService(); + // Wait for the internal client created. + final String topicNameTriggerInternalClientCreate = + BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); + waitReplicatorStarted(topicNameTriggerInternalClientCreate); + cleanupTopics(() -> { + admin1.topics().delete(topicNameTriggerInternalClientCreate); + admin2.topics().delete(topicNameTriggerInternalClientCreate); + }); + + // Inject spy client. + ConcurrentOpenHashMap + replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); + PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); + PulsarClient spyClient = spy(internalClient); + replicationClients.put(cluster2, spyClient); + + // Inject producer decorator. + doAnswer(invocation -> { + Schema schema = (Schema) invocation.getArguments()[0]; + ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) internalClient.newProducer(schema); + ProducerBuilder spyProducerBuilder = spy(producerBuilder); + doAnswer(ignore -> { + CompletableFuture producerFuture = new CompletableFuture<>(); + producerBuilder.createAsync().whenComplete((p, t) -> { + if (t != null) { + producerFuture.completeExceptionally(t); + return; + } + ProducerImpl pImpl = (ProducerImpl) p; + new FastThreadLocalThread(() -> { + try { + ProducerImpl newProducer = producerDecorator.apply(producerBuilder.getConf(), pImpl); + producerFuture.complete(newProducer); + } catch (Exception ex) { + producerFuture.completeExceptionally(ex); + } + }).start(); + }); + + return producerFuture; + }).when(spyProducerBuilder).createAsync(); + return spyProducerBuilder; + }).when(spyClient).newProducer(any(Schema.class)); + } + + private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception { + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName); + ManagedCursorImpl spyCursor = spy(cursor); + // remove cursor. + ml.getCursors().removeCursor(cursorName); + ml.deactivateCursor(cursor); + // Add the spy one. addCursor(ManagedCursorImpl cursor) + Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new Class[]{ManagedCursorImpl.class}); + m.setAccessible(true); + m.invoke(ml, new Object[]{spyCursor}); + return new SpyCursor(cursor, spyCursor); + } + + @Data + @AllArgsConstructor + static class SpyCursor { + ManagedCursorImpl original; + ManagedCursorImpl spy; + } + + private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor) throws Exception { + CountDownLatch startCloseSignal = new CountDownLatch(1); + CountDownLatch startCallbackSignal = new CountDownLatch(1); + doAnswer(invocation -> { + AsyncCallbacks.CloseCallback originalCallback = (AsyncCallbacks.CloseCallback) invocation.getArguments()[0]; + Object ctx = invocation.getArguments()[1]; + AsyncCallbacks.CloseCallback newCallback = new AsyncCallbacks.CloseCallback() { + @Override + public void closeComplete(Object ctx) { + new FastThreadLocalThread(new Runnable() { + @Override + @SneakyThrows + public void run() { + startCallbackSignal.await(); + originalCallback.closeComplete(ctx); + } + }).start(); + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + new FastThreadLocalThread(new Runnable() { + @Override + @SneakyThrows + public void run() { + startCallbackSignal.await(); + originalCallback.closeFailed(exception, ctx); + } + }).start(); + } + }; + startCloseSignal.await(); + spyCursor.original.asyncClose(newCallback, ctx); + return null; + }).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class), any()); + return new CursorCloseSignal(startCloseSignal, startCallbackSignal); + } + + @AllArgsConstructor + static class CursorCloseSignal { + CountDownLatch startCloseSignal; + CountDownLatch startCallbackSignal; + + void startClose() { + startCloseSignal.countDown(); + } + + void startCallback() { + startCallbackSignal.countDown(); + } + } + + /** + * See the description and execution flow: https://github.com/apache/pulsar/pull/21946. + * Steps: + * - Create topic, but the internal producer of Replicator created failed. + * - Unload bundle, the Replicator will be closed, but the internal producer creation retry has not executed yet. + * - The internal producer creation retry execute successfully, the "repl.cursor" has not been closed yet. + * - The topic is wholly closed. + * - Verify: the delayed created internal producer will be closed. + */ + @Test(timeOut = 120 * 1000) + public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + // Inject an error for "replicator.producer" creation. + // The delay time of next retry to create producer is below: + // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s... + // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. + final AtomicInteger createProducerCounter = new AtomicInteger(); + final int failTimes = 6; + injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { + if (topicName.equals(producerCnf.getTopicName())) { + // There is a switch to determine create producer successfully or not. + if (createProducerCounter.incrementAndGet() > failTimes) { + return originalProducer; + } + log.info("Retry create replicator.producer count: {}", createProducerCounter); + // Release producer and fail callback. + originalProducer.closeAsync(); + throw new RuntimeException("mock error"); + } + return originalProducer; + }); + + // Create topic. + admin1.topics().createNonPartitionedTopic(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentReplicator replicator = + (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + // Since we inject a producer creation error, the replicator can not start successfully. + assertFalse(replicator.isConnected()); + + // Stuck the closing of the cursor("pulsar.repl"), until the internal producer of the replicator started. + SpyCursor spyCursor = + spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName()); + CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor); + + // Unload bundle: call "topic.close(false)". + // Stuck start new producer, until the state of replicator change to Stopped. + // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. + Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + assertTrue(createProducerCounter.get() >= failTimes, + "count of retry to create producer is " + createProducerCounter.get()); + }); + CompletableFuture topicCloseFuture = persistentTopic.close(true); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + String state = String.valueOf(replicator.getState()); + assertTrue(state.equals("Stopped") || state.equals("Terminated")); + }); + + // Delay close cursor, until "replicator.producer" create successfully. + // The next once retry time of create "replicator.producer" will be 3.2s. + Thread.sleep(4 * 1000); + log.info("Replicator.state: {}", replicator.getState()); + cursorCloseSignal.startClose(); + cursorCloseSignal.startCallback(); + + // Wait for topic close successfully. + // Verify there is no orphan producer on the remote cluster. + topicCloseFuture.join(); + Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertEquals(persistentTopic2.getProducers().size(), 0); Assert.assertFalse(replicator.isConnected()); }); + // cleanup. cleanupTopics(() -> { admin1.topics().delete(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 33620716288af..8e8b444f952c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -18,21 +18,28 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.google.common.collect.Sets; import java.net.URL; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.testng.Assert; @Slf4j public abstract class OneWayReplicatorTestBase extends TestRetrySupport { @@ -140,10 +147,32 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { } protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception { + waitChangeEventsInit(defaultNamespace); admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Collections.singleton(cluster1)); admin1.namespaces().unload(defaultNamespace); cleanupTopicAction.run(); admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2)); + waitChangeEventsInit(defaultNamespace); + } + + protected void waitChangeEventsInit(String namespace) { + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() + .getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false) + .join().get(); + Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> { + TopicStatsImpl topicStats = topic.getStats(true, false, false); + topicStats.getSubscriptions().entrySet().forEach(entry -> { + // No wait for compaction. + if (COMPACTION_SUBSCRIPTION.equals(entry.getKey())) { + return; + } + // No wait for durable cursor. + if (entry.getValue().isDurable()) { + return; + } + Assert.assertTrue(entry.getValue().getMsgBacklog() == 0, entry.getKey()); + }); + }); } protected interface CleanupTopicAction { @@ -166,7 +195,7 @@ protected void setup() throws Exception { log.info("--- OneWayReplicatorTestBase::setup completed ---"); } - private void setConfigDefaults(ServiceConfiguration config, String clusterName, + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { config.setClusterName(clusterName); config.setAdvertisedAddress("localhost"); @@ -185,10 +214,19 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName, config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadBalancerSheddingEnabled(false); } @Override protected void cleanup() throws Exception { + // delete namespaces. + waitChangeEventsInit(defaultNamespace); + admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1)); + admin1.namespaces().deleteNamespace(defaultNamespace); + admin2.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster2)); + admin2.namespaces().deleteNamespace(defaultNamespace); + + // shutdown. markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d5044276a5a63..de9d0272fc002 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1799,12 +1799,12 @@ public void testClosingReplicationProducerTwice() throws Exception { any(), eq(null) ); - replicator.disconnect(false); - replicator.disconnect(false); + replicator.terminate(); + replicator.terminate(); replicator.startProducer(); - verify(clientImpl, Mockito.times(2)).createProducerAsync(any(), any(), any()); + verify(clientImpl, Mockito.times(1)).createProducerAsync(any(), any(), any()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 88a668e8745d5..a05c3468ea16e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -895,7 +895,7 @@ public void testReplicatorProducerClosing() throws Exception { pulsar2 = null; pulsar3.close(); pulsar3 = null; - replicator.disconnect(false); + replicator.terminate(); Thread.sleep(100); Field field = AbstractReplicator.class.getDeclaredField("producer"); field.setAccessible(true); @@ -1834,7 +1834,7 @@ public void testReplicatorWithTTL() throws Exception { persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; // Pause replicator - persistentReplicator.disconnect(); + pauseReplicator(persistentReplicator); }); persistentProducer1.send("V2".getBytes()); @@ -1874,4 +1874,11 @@ public void testReplicatorWithTTL() throws Exception { assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); } + + private void pauseReplicator(PersistentReplicator replicator) { + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.isConnected()); + }); + replicator.closeProducerAsync(true); + } }