From 64e7400860cb8a888f341a8d7bf218ac28b64be8 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Jan 2024 00:48:11 +0800 Subject: [PATCH 01/22] [fix] [broker] Part-1: Replicator can not created successfully due to an orphan replicator in the previous topic owner --- .../broker/service/AbstractReplicator.java | 78 ++++++++++++++----- .../NonPersistentReplicator.java | 2 +- .../persistent/PersistentReplicator.java | 6 +- 3 files changed, 62 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 1b5b2824257b0..07ddcdf92394d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -67,7 +67,16 @@ public abstract class AbstractReplicator { private volatile State state = State.Stopped; protected enum State { - Stopped, Starting, Started, Stopping + // The internal producer is stopped. + Stopped, + // Trying to create a new internal producer. + Starting, + // The internal producer has started, and tries copy data. + Started, + // The internal producer is trying to stop. + Stopping, + // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. + Closed } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, @@ -124,8 +133,7 @@ public synchronized void startProducer() { replicatorId, waitTimeMs / 1000.0); } // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); return; } State state = STATE_UPDATER.get(this); @@ -150,10 +158,8 @@ public synchronized void startProducer() { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, waitTimeMs, - TimeUnit.MILLISECONDS); + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, STATE_UPDATER.get(this), ex); @@ -163,16 +169,38 @@ public synchronized void startProducer() { } - protected void checkTopicActiveAndRetryStartProducer() { - isLocalTopicActive().thenAccept(isTopicActive -> { - if (isTopicActive) { - startProducer(); + protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { + brokerService.executor().schedule(() -> { + if (state == State.Closed) { + return; } - }).exceptionally(ex -> { - log.warn("[{}] Stop retry to create producer due to topic load fail. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); - return null; - }); + CompletableFuture> topicFuture = brokerService.getTopics().get(localTopicName); + if (topicFuture == null) { + // Topic closed. + return; + } + topicFuture.thenAccept(optional -> { + if (optional.isEmpty()) { + // Topic closed. + return; + } + if (optional.get() != localTopic) { + // Topic closed and created a new one, current replicator is outdated. + return; + } + // TODO check isClosing or Deleting. + Replicator replicator = localTopic.getReplicators().get(replicatorId); + if (replicator != AbstractReplicator.this) { + // Current replicator has been closed, and created a new one. + return; + } + startProducer(); + }).exceptionally(ex -> { + log.warn("[{}] [{}] Stop retry to create producer due to unknown error. Replicator state: {}", + localTopicName, replicatorId, STATE_UPDATER.get(this), ex); + return null; + }); + }, waitTimeMs, TimeUnit.MILLISECONDS); } protected CompletableFuture isLocalTopicActive() { @@ -188,14 +216,14 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } - protected synchronized CompletableFuture closeProducerAsync() { + protected synchronized CompletableFuture closeAsync(boolean onlyCloseProducer) { if (producer == null) { - STATE_UPDATER.set(this, State.Stopped); + updateStatus(onlyCloseProducer); return CompletableFuture.completedFuture(null); } CompletableFuture future = producer.closeAsync(); return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Stopped); + updateStatus(onlyCloseProducer); this.producer = null; // deactivate further read disableReplicatorRead(); @@ -206,11 +234,21 @@ protected synchronized CompletableFuture closeProducerAsync() { + " retrying again in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(() -> closeAsync(onlyCloseProducer), + waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } + protected void updateStatus(boolean onlyCloseProducer) { + if (onlyCloseProducer) { + // Only close producer. + STATE_UPDATER.set(this, State.Stopped); + } else { + // Close replicator. + STATE_UPDATER.set(this, State.Closed); + } + } public CompletableFuture disconnect() { return disconnect(false); @@ -239,7 +277,7 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); } - return closeProducerAsync(); + return closeAsync(true); } public CompletableFuture remove() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 087c5f932008f..dccd2bb4f3575 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -79,7 +79,7 @@ protected void readEntries(Producer producer) { + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); STATE_UPDATER.set(this, State.Stopping); - closeProducerAsync(); + closeAsync(false); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 754d25b8b0ab4..a28770745c941 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -155,7 +155,7 @@ protected void readEntries(Producer producer) { + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); STATE_UPDATER.set(this, State.Stopping); - closeProducerAsync(); + closeAsync(false); } } @@ -437,7 +437,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeProducerAsync(); + closeAsync(false); return; } else if (!(exception instanceof TooManyRequestsException)) { log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", @@ -556,7 +556,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeProducerAsync(); + closeAsync(false); return; } if (ctx instanceof PositionImpl) { From 29b21b39b7a69e2bfcd8562320d4fee118ee210d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Jan 2024 22:00:27 +0800 Subject: [PATCH 02/22] replicator.disconnect -> terminate --- .../broker/service/AbstractReplicator.java | 94 ++++++++++++++----- .../pulsar/broker/service/BrokerService.java | 2 +- .../pulsar/broker/service/Replicator.java | 4 +- .../NonPersistentReplicator.java | 3 +- .../nonpersistent/NonPersistentTopic.java | 10 +- .../persistent/PersistentReplicator.java | 19 ++-- .../service/persistent/PersistentTopic.java | 22 ++--- .../service/AbstractReplicatorTest.java | 2 +- .../broker/service/PersistentTopicTest.java | 4 +- .../pulsar/broker/service/ReplicatorTest.java | 4 +- 10 files changed, 105 insertions(+), 59 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 07ddcdf92394d..a0948fdfe4970 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -75,6 +75,8 @@ protected enum State { Started, // The internal producer is trying to stop. Stopping, + // The replicator is closing. + Closing, // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. Closed } @@ -216,14 +218,15 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } - protected synchronized CompletableFuture closeAsync(boolean onlyCloseProducer) { + protected synchronized CompletableFuture closeProducerAsync() { if (producer == null) { - updateStatus(onlyCloseProducer); + tryChangeStatusToStopped(); return CompletableFuture.completedFuture(null); } + tryChangeStatusToStopping(); CompletableFuture future = producer.closeAsync(); return future.thenRun(() -> { - updateStatus(onlyCloseProducer); + tryChangeStatusToStopped(); this.producer = null; // deactivate further read disableReplicatorRead(); @@ -234,27 +237,40 @@ protected synchronized CompletableFuture closeAsync(boolean onlyCloseProdu + " retrying again in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(() -> closeAsync(onlyCloseProducer), - waitTimeMs, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } - protected void updateStatus(boolean onlyCloseProducer) { - if (onlyCloseProducer) { - // Only close producer. - STATE_UPDATER.set(this, State.Stopped); - } else { - // Close replicator. + protected synchronized CompletableFuture terminateInternal() { + if (producer == null) { STATE_UPDATER.set(this, State.Closed); + return CompletableFuture.completedFuture(null); } + CompletableFuture future = producer.closeAsync(); + return future.thenRun(() -> { + STATE_UPDATER.set(this, State.Closed); + this.producer = null; + // set the cursor as inactive. + disableReplicatorRead(); + }).exceptionally(ex -> { + long waitTimeMs = backOff.next(); + log.warn( + "[{}] Exception: '{}' occurred while trying to terminate the replicator." + + " retrying again in {} s", + replicatorId, ex.getMessage(), waitTimeMs / 1000.0); + // BackOff before retrying + brokerService.executor().schedule(() -> terminateInternal(), + waitTimeMs, TimeUnit.MILLISECONDS); + return null; + }); } - public CompletableFuture disconnect() { - return disconnect(false); + public CompletableFuture terminate() { + return terminate(false); } - public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { + public synchronized CompletableFuture terminate(boolean failIfHasBacklog) { if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { CompletableFuture disconnectFuture = new CompletableFuture<>(); disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); @@ -264,20 +280,52 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) return disconnectFuture; } - if (STATE_UPDATER.get(this) == State.Stopping) { - // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by - // closeProducerAsync() - // which will at some point change the state to stopped + log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, + getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); + if (!tryChangeStatusToClosing()) { + // The replicator has been called "terminate" before, just return success. return CompletableFuture.completedFuture(null); } + return terminateInternal(); + } - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) - || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) { - log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, - getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); + protected boolean tryChangeStatusToClosing() { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Closing)){ + return true; } + if (STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)){ + return true; + } + if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Closing)){ + return true; + } + if (STATE_UPDATER.compareAndSet(this, State.Stopped, State.Closing)) { + return true; + } + return false; + } - return closeAsync(true); + protected boolean tryChangeStatusToStopping() { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)){ + return true; + } + if (STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)){ + return true; + } + return false; + } + + protected boolean tryChangeStatusToStopped() { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)){ + return true; + } + if (STATE_UPDATER.compareAndSet(this, State.Started, State.Stopped)){ + return true; + } + if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Stopped)){ + return true; + } + return false; } public CompletableFuture remove() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2687532693a45..80930901ca6c4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -742,7 +742,7 @@ public CompletableFuture closeAndRemoveReplicationClient(String clusterNam if (ot.isPresent()) { Replicator r = ot.get().getReplicators().get(clusterName); if (r != null && r.isConnected()) { - r.disconnect(false).whenComplete((v, e) -> f.complete(null)); + r.terminate(false).whenComplete((v, e) -> f.complete(null)); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 482fa2cbd2300..3be5623c4d70c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -29,9 +29,9 @@ public interface Replicator { ReplicatorStatsImpl getStats(); - CompletableFuture disconnect(); + CompletableFuture terminate(); - CompletableFuture disconnect(boolean b); + CompletableFuture terminate(boolean b); void updateRates(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index dccd2bb4f3575..552d75d460ab8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -78,8 +78,7 @@ protected void readEntries(Producer producer) { "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); - STATE_UPDATER.set(this, State.Stopping); - closeAsync(false); + closeProducerAsync(); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 0ac06d6883ff1..8463b9b0d2f9c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -420,7 +420,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c CompletableFuture closeClientFuture = new CompletableFuture<>(); if (closeIfClientsConnected) { List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); FutureUtil.waitForAll(futures).thenRun(() -> { @@ -523,7 +523,7 @@ public CompletableFuture close( List> futures = new ArrayList<>(); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -582,7 +582,7 @@ public CompletableFuture close( public CompletableFuture stopReplProducers() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } @@ -662,7 +662,7 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - replicators.get(remoteCluster).disconnect().thenRun(() -> { + replicators.get(remoteCluster).terminate().thenRun(() -> { log.info("[{}] Successfully removed replicator {}", name, remoteCluster); replicators.remove(remoteCluster); @@ -1031,7 +1031,7 @@ private CompletableFuture disconnectReplicators() { List> futures = new ArrayList<>(); ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { - futures.add(replicator.disconnect()); + futures.add(replicator.terminate()); }); return FutureUtil.waitForAll(futures); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index a28770745c941..7a2fa3f6d350d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -154,8 +154,7 @@ protected void readEntries(Producer producer) { "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); - STATE_UPDATER.set(this, State.Stopping); - closeAsync(false); + closeProducerAsync(); } } @@ -436,8 +435,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeAsync(false); + // replicator is already deleted and cursor is already closed so, producer should also be stopped. + closeProducerAsync(); return; } else if (!(exception instanceof TooManyRequestsException)) { log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", @@ -555,8 +554,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { if (exception instanceof CursorAlreadyClosedException) { log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped - closeAsync(false); + // replicator is already deleted and cursor is already closed so, producer should also be stopped. + closeProducerAsync(); return; } if (ctx instanceof PositionImpl) { @@ -676,15 +675,15 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } @Override - public CompletableFuture disconnect() { - return disconnect(false); + public CompletableFuture terminate() { + return terminate(false); } @Override - public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { + public synchronized CompletableFuture terminate(boolean failIfHasBacklog) { final CompletableFuture future = new CompletableFuture<>(); - super.disconnect(failIfHasBacklog).thenRun(() -> { + super.terminate(failIfHasBacklog).thenRun(() -> { dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); future.complete(null); }).exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e4441969101c1..706cc2fbae6c5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -833,15 +833,15 @@ public CompletableFuture startReplProducers() { public CompletableFuture stopReplProducers() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect())); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate())); return FutureUtil.waitForAll(closeFutures); } private synchronized CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true))); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true))); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate(true))); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate(true))); return FutureUtil.waitForAll(closeFutures); } @@ -1423,8 +1423,8 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, List> futures = new ArrayList<>(); subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty()))); if (closeIfClientsConnected) { - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); producers.values().forEach(producer -> futures.add(producer.disconnect())); } FutureUtil.waitForAll(futures).thenRunAsync(() -> { @@ -1565,8 +1565,8 @@ public CompletableFuture close( List> futures = new ArrayList<>(); futures.add(transactionBuffer.closeAsync()); - replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); + replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate())); + shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate())); if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> { @@ -1941,7 +1941,7 @@ CompletableFuture removeReplicator(String remoteCluster) { String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); - Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect) + Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate) .orElse(CompletableFuture.completedFuture(null)).thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override @@ -2013,7 +2013,7 @@ CompletableFuture removeShadowReplicator(String shadowTopic) { log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic); final CompletableFuture future = new CompletableFuture<>(); String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic); - shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> { + shadowReplicators.get(shadowTopic).terminate().thenRun(() -> { ledger.asyncDeleteCursor(name, new DeleteCursorCallback() { @Override @@ -2897,7 +2897,7 @@ private CompletableFuture checkAndDisconnectReplicators() { ConcurrentOpenHashMap replicators = getReplicators(); replicators.forEach((r, replicator) -> { if (replicator.getNumberOfEntriesInBacklog() <= 0) { - futures.add(replicator.disconnect()); + futures.add(replicator.terminate()); } }); return FutureUtil.waitForAll(futures); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 8699c73246830..7bfd89eeda290 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -94,7 +94,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, replicatorPrefix, broker, remoteClient); replicator.startProducer(); - replicator.disconnect(); + replicator.terminate(); // Verify task will done. Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index d5044276a5a63..4794a7a5f284e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1799,8 +1799,8 @@ public void testClosingReplicationProducerTwice() throws Exception { any(), eq(null) ); - replicator.disconnect(false); - replicator.disconnect(false); + replicator.terminate(false); + replicator.terminate(false); replicator.startProducer(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 88a668e8745d5..425dd6b91b5e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -895,7 +895,7 @@ public void testReplicatorProducerClosing() throws Exception { pulsar2 = null; pulsar3.close(); pulsar3 = null; - replicator.disconnect(false); + replicator.terminate(false); Thread.sleep(100); Field field = AbstractReplicator.class.getDeclaredField("producer"); field.setAccessible(true); @@ -1834,7 +1834,7 @@ public void testReplicatorWithTTL() throws Exception { persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; // Pause replicator - persistentReplicator.disconnect(); + persistentReplicator.terminate(); }); persistentProducer1.send("V2".getBytes()); From 6407f68494584fdbfaf4edb798cd497b804b6132 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Mon, 29 Jan 2024 23:34:24 +0800 Subject: [PATCH 03/22] rename status close -> terminate --- .../broker/service/AbstractReplicator.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index a0948fdfe4970..c2264d0d809e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -75,10 +75,10 @@ protected enum State { Started, // The internal producer is trying to stop. Stopping, - // The replicator is closing. - Closing, + // The replicator is in terminating. + Terminating, // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. - Closed + Terminated } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, @@ -173,7 +173,7 @@ public synchronized void startProducer() { protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { brokerService.executor().schedule(() -> { - if (state == State.Closed) { + if (state == State.Terminated) { return; } CompletableFuture> topicFuture = brokerService.getTopics().get(localTopicName); @@ -244,12 +244,12 @@ protected synchronized CompletableFuture closeProducerAsync() { protected synchronized CompletableFuture terminateInternal() { if (producer == null) { - STATE_UPDATER.set(this, State.Closed); + STATE_UPDATER.set(this, State.Terminated); return CompletableFuture.completedFuture(null); } CompletableFuture future = producer.closeAsync(); return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Closed); + STATE_UPDATER.set(this, State.Terminated); this.producer = null; // set the cursor as inactive. disableReplicatorRead(); @@ -282,24 +282,24 @@ public synchronized CompletableFuture terminate(boolean failIfHasBacklog) log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); - if (!tryChangeStatusToClosing()) { + if (!tryChangeStatusToTerminating()) { // The replicator has been called "terminate" before, just return success. return CompletableFuture.completedFuture(null); } return terminateInternal(); } - protected boolean tryChangeStatusToClosing() { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Closing)){ + protected boolean tryChangeStatusToTerminating() { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ return true; } - if (STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)){ + if (STATE_UPDATER.compareAndSet(this, State.Started, State.Terminating)){ return true; } - if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Closing)){ + if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Terminating)){ return true; } - if (STATE_UPDATER.compareAndSet(this, State.Stopped, State.Closing)) { + if (STATE_UPDATER.compareAndSet(this, State.Stopped, State.Terminating)) { return true; } return false; From 55711a1e3079ca8f54c8abcb4c407eeb790febb6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 31 Jan 2024 15:11:02 +0800 Subject: [PATCH 04/22] reset --- .../broker/service/OneWayReplicatorTest.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 1accd04f4918c..e2f2c69170af3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -153,4 +153,34 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + + @Test + public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicName); + // Wait for replicator started. + waitReplicatorStarted(topicName); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); + PersistentReplicator replicator = + (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + // Mock an error when calling "replicator.disconnect()" + ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); + Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); + ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer); + // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be + // successful. + admin1.topics().unload(topicName); + // Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed + // successful. + overrideProducerForReplicator(replicator, originalProducer); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(replicator.isConnected()); + }); + // cleanup. + cleanupTopics(() -> { + admin1.topics().delete(topicName); + admin2.topics().delete(topicName); + }); + } } From 24409a2f4c586cbd6eed2185e12043061d8716f6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 00:50:07 +0800 Subject: [PATCH 05/22] fix bug --- .../apache/pulsar/broker/service/AbstractReplicator.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index c2264d0d809e5..9765d45a14f51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.broker.service; +import com.google.common.annotations.VisibleForTesting; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import lombok.Getter; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -64,9 +66,11 @@ public abstract class AbstractReplicator { protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); + @VisibleForTesting + @Getter private volatile State state = State.Stopped; - protected enum State { + public enum State { // The internal producer is stopped. Stopped, // Trying to create a new internal producer. @@ -191,7 +195,7 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { return; } // TODO check isClosing or Deleting. - Replicator replicator = localTopic.getReplicators().get(replicatorId); + Replicator replicator = localTopic.getReplicators().get(remoteCluster); if (replicator != AbstractReplicator.this) { // Current replicator has been closed, and created a new one. return; From 80dcfcd7676dbfa6285c65d433169b4c3b9b2a79 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 00:50:38 +0800 Subject: [PATCH 06/22] add test --- .../broker/service/OneWayReplicatorTest.java | 218 ++++++++++++++++-- .../service/OneWayReplicatorTestBase.java | 2 +- 2 files changed, 206 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index e2f2c69170af3..d54dbf0885f1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -18,28 +18,54 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class OneWayReplicatorTest extends OneWayReplicatorTestBase { @@ -136,7 +162,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); // Mock an error when calling "replicator.disconnect()" ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); - Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); + when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer); // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be // successful. @@ -154,29 +180,195 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception }); } + private void injectMockReplicatorProducerBuilder( + BiFunction producerDecorator) + throws Exception { + String cluster2 = pulsar2.getConfig().getClusterName(); + BrokerService brokerService = pulsar1.getBrokerService(); + // Wait for the internal client created. + // the topic "__change_event" will trigger it created. +// Awaitility.await().untilAsserted(() -> { +// ConcurrentOpenHashMap +// replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); +// PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); +// assertNotNull(internalClient); +// }); + final String topicNameTriggerInternalClientCreate = + BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); + waitReplicatorStarted(topicNameTriggerInternalClientCreate); + cleanupTopics(() -> { + admin1.topics().delete(topicNameTriggerInternalClientCreate); + admin2.topics().delete(topicNameTriggerInternalClientCreate); + }); + + // Inject spy client. + ConcurrentOpenHashMap + replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); + PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); + PulsarClient spyClient = spy(internalClient); + replicationClients.put(cluster2, spyClient); + + // Inject producer decorator. + doAnswer(invocation -> { + Schema schema = (Schema) invocation.getArguments()[0]; + ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) internalClient.newProducer(schema); + ProducerBuilder spyProducerBuilder = spy(producerBuilder); + doAnswer(ignore -> { + CompletableFuture producerFuture = new CompletableFuture<>(); + final ProducerImpl p = (ProducerImpl) producerBuilder.create(); + new FastThreadLocalThread(() -> { + try { + ProducerImpl newProducer = producerDecorator.apply(producerBuilder.getConf(), p); + producerFuture.complete(newProducer); + } catch (Exception ex) { + producerFuture.completeExceptionally(ex); + } + }).start(); + return producerFuture; + }).when(spyProducerBuilder).createAsync(); + return spyProducerBuilder; + }).when(spyClient).newProducer(any(Schema.class)); + } + + private SpyCursor spyCursor(PersistentTopic persistentTopic, String cursorName) throws Exception { + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName); + ManagedCursorImpl spyCursor = spy(cursor); + // remove cursor. + ml.getCursors().removeCursor(cursorName); + ml.deactivateCursor(cursor); + // Add the spy one. addCursor(ManagedCursorImpl cursor) + Method m = ManagedLedgerImpl.class.getDeclaredMethod("addCursor", new Class[]{ManagedCursorImpl.class}); + m.setAccessible(true); + m.invoke(ml, new Object[]{spyCursor}); + return new SpyCursor(cursor, spyCursor); + } + + @Data + @AllArgsConstructor + static class SpyCursor { + ManagedCursorImpl original; + ManagedCursorImpl spy; + } + + private CursorCloseSignal makeCursorClosingDelay(SpyCursor spyCursor) throws Exception { + CountDownLatch startCloseSignal = new CountDownLatch(1); + CountDownLatch startCallbackSignal = new CountDownLatch(1); + doAnswer(invocation -> { + AsyncCallbacks.CloseCallback originalCallback = (AsyncCallbacks.CloseCallback) invocation.getArguments()[0]; + Object ctx = invocation.getArguments()[1]; + AsyncCallbacks.CloseCallback newCallback = new AsyncCallbacks.CloseCallback() { + @Override + public void closeComplete(Object ctx) { + new FastThreadLocalThread(new Runnable() { + @Override + @SneakyThrows + public void run() { + startCallbackSignal.await(); + originalCallback.closeComplete(ctx); + } + }).start(); + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + new FastThreadLocalThread(new Runnable() { + @Override + @SneakyThrows + public void run() { + startCallbackSignal.await(); + originalCallback.closeFailed(exception, ctx); + } + }).start(); + } + }; + startCloseSignal.await(); + spyCursor.original.asyncClose(newCallback, ctx); + return null; + }).when(spyCursor.spy).asyncClose(any(AsyncCallbacks.CloseCallback.class), any()); + return new CursorCloseSignal(startCloseSignal, startCallbackSignal); + } + + @AllArgsConstructor + static class CursorCloseSignal { + CountDownLatch startCloseSignal; + CountDownLatch startCallbackSignal; + + void startClose() { + startCloseSignal.countDown(); + } + + void startCallback() { + startCallbackSignal.countDown(); + } + } + + /** + * See the description and execution flow: https://github.com/apache/pulsar/pull/21946. + */ @Test public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); + // Inject an error for "replicator.producer" creation. + // The delay time of next retry to create producer is below: + // 0.1s, 0.2, 0.4, 0.8, 1.6s, 3.2s, 6.4s... + // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. + final AtomicInteger createProducerCounter = new AtomicInteger(); + final int failTimes = 6; + injectMockReplicatorProducerBuilder((producerCnf, orginalProducer) -> { + if (topicName.equals(producerCnf.getTopicName())) { + // There is a switch to determine create producer successfully or not. + if (createProducerCounter.incrementAndGet() > failTimes) { + return orginalProducer; + } + log.info("===> " + createProducerCounter); + // Release producer and fail callback. + orginalProducer.closeAsync(); + throw new RuntimeException("mock error"); + } + return orginalProducer; + }); + + // Create topic. admin1.topics().createNonPartitionedTopic(topicName); - // Wait for replicator started. - waitReplicatorStarted(topicName); PersistentTopic persistentTopic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); PersistentReplicator replicator = (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); - // Mock an error when calling "replicator.disconnect()" - ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); - Mockito.when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); - ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer); - // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be - // successful. - admin1.topics().unload(topicName); - // Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed - // successful. - overrideProducerForReplicator(replicator, originalProducer); + // Since we inject a producer creation error, the replicator can not start successfully. + assertFalse(replicator.isConnected()); + + // Stuck the closing of the cursor("pulsar.repl"), until the internal producer of the replicator started. + SpyCursor spyCursor = + spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName()); + CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor); + + // Stuck start new producer, until the state of replicator change to Stopped. + // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. + Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { + assertTrue(createProducerCounter.get() >= failTimes); + }); + CompletableFuture topicCloseFuture = admin1.topics().unloadAsync(topicName); + Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { + assertTrue(AbstractReplicator.State.Stopped.equals(replicator.getState()) + || AbstractReplicator.State.Terminated.equals(replicator.getState())); + }); + + // Delay close cursor, until "replicator.producer" create successfully. + // The next once retry time of create "replicator.producer" will be 3.2s. + Thread.sleep(4 * 1000); + cursorCloseSignal.startClose(); + cursorCloseSignal.startCallback(); + + // Wait for topic close successfully. + // Verify there is no orphan producer on the remote cluster. + topicCloseFuture.join(); Awaitility.await().untilAsserted(() -> { Assert.assertFalse(replicator.isConnected()); }); + assertEquals(admin2.topics().getStats(topicName).getPublishers().size(), 0); + // cleanup. cleanupTopics(() -> { admin1.topics().delete(topicName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 33620716288af..8a8dd6f948d31 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -166,7 +166,7 @@ protected void setup() throws Exception { log.info("--- OneWayReplicatorTestBase::setup completed ---"); } - private void setConfigDefaults(ServiceConfiguration config, String clusterName, + protected void setConfigDefaults(ServiceConfiguration config, String clusterName, LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) { config.setClusterName(clusterName); config.setAdvertisedAddress("localhost"); From 5c2b503d4b014057a64beadcc519b969ef746781 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 01:05:48 +0800 Subject: [PATCH 07/22] improve test --- .../broker/service/OneWayReplicatorTest.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index d54dbf0885f1f..10137b6b28295 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -322,7 +322,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception if (createProducerCounter.incrementAndGet() > failTimes) { return orginalProducer; } - log.info("===> " + createProducerCounter); + log.info("Retry create replicator.producer count: {}", createProducerCounter); // Release producer and fail callback. orginalProducer.closeAsync(); throw new RuntimeException("mock error"); @@ -349,25 +349,28 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { assertTrue(createProducerCounter.get() >= failTimes); }); - CompletableFuture topicCloseFuture = admin1.topics().unloadAsync(topicName); + CompletableFuture topicCloseFuture = persistentTopic.close(true); Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { - assertTrue(AbstractReplicator.State.Stopped.equals(replicator.getState()) - || AbstractReplicator.State.Terminated.equals(replicator.getState())); + String state = String.valueOf(replicator.getState()); + assertTrue(state.equals("Stopped") || state.equals("Terminated")); }); // Delay close cursor, until "replicator.producer" create successfully. // The next once retry time of create "replicator.producer" will be 3.2s. Thread.sleep(4 * 1000); + log.info("Replicator.state: {}", replicator.getState()); cursorCloseSignal.startClose(); cursorCloseSignal.startCallback(); // Wait for topic close successfully. // Verify there is no orphan producer on the remote cluster. topicCloseFuture.join(); - Awaitility.await().untilAsserted(() -> { + Awaitility.await().pollInterval(Duration.ofSeconds(1)).untilAsserted(() -> { + PersistentTopic persistentTopic2 = + (PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get(); + assertEquals(persistentTopic2.getProducers().size(), 0); Assert.assertFalse(replicator.isConnected()); }); - assertEquals(admin2.topics().getStats(topicName).getPublishers().size(), 0); // cleanup. cleanupTopics(() -> { From e7e09020fe4ad5c8479d299f35ccbea6af28c84b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 01:07:45 +0800 Subject: [PATCH 08/22] improve test --- .../org/apache/pulsar/broker/service/OneWayReplicatorTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 10137b6b28295..334e122d2cd3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -308,7 +308,7 @@ void startCallback() { * See the description and execution flow: https://github.com/apache/pulsar/pull/21946. */ @Test - public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception { + public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); // Inject an error for "replicator.producer" creation. // The delay time of next retry to create producer is below: @@ -344,6 +344,7 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce1() throws Exception spyCursor(persistentTopic, "pulsar.repl." + pulsar2.getConfig().getClusterName()); CursorCloseSignal cursorCloseSignal = makeCursorClosingDelay(spyCursor); + // Unload bundle: call "topic.close(false)". // Stuck start new producer, until the state of replicator change to Stopped. // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { From ad828ea81230b8017babf46b3db95bd1258f288c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 01:13:36 +0800 Subject: [PATCH 09/22] remove unnecessary code --- .../apache/pulsar/broker/service/OneWayReplicatorTest.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 334e122d2cd3b..68768a70704f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -186,13 +186,6 @@ private void injectMockReplicatorProducerBuilder( String cluster2 = pulsar2.getConfig().getClusterName(); BrokerService brokerService = pulsar1.getBrokerService(); // Wait for the internal client created. - // the topic "__change_event" will trigger it created. -// Awaitility.await().untilAsserted(() -> { -// ConcurrentOpenHashMap -// replicationClients = WhiteboxImpl.getInternalState(brokerService, "replicationClients"); -// PulsarClientImpl internalClient = (PulsarClientImpl) replicationClients.get(cluster2); -// assertNotNull(internalClient); -// }); final String topicNameTriggerInternalClientCreate = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topicNameTriggerInternalClientCreate); From c9e5effb92421a7e289abc235e60f682de8db6cc Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 01:18:23 +0800 Subject: [PATCH 10/22] improve test --- .../apache/pulsar/broker/service/OneWayReplicatorTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 68768a70704f7..45e10f39cdf74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -299,6 +299,12 @@ void startCallback() { /** * See the description and execution flow: https://github.com/apache/pulsar/pull/21946. + * Steps: + * - Create topic, but the internal producer of Replicator created failed. + * - Unload bundle, the Replicator will be closed, but the internal producer creation retry has not executed yet. + * - The internal producer creation retry execute successfully, the "repl.cursor" has not been closed yet. + * - The topic is wholly closed. + * - Verify: the delayed created internal producer will be closed. */ @Test public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { From cdedb5b5980051c887a7072c2ddc83ac31e5816b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 10:57:51 +0800 Subject: [PATCH 11/22] address comment --- .../pulsar/broker/service/OneWayReplicatorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 45e10f39cdf74..4f051ff90e801 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -315,18 +315,18 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception // If the retry counter is larger than 6, the next creation will be slow enough to close Replicator. final AtomicInteger createProducerCounter = new AtomicInteger(); final int failTimes = 6; - injectMockReplicatorProducerBuilder((producerCnf, orginalProducer) -> { + injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> { if (topicName.equals(producerCnf.getTopicName())) { // There is a switch to determine create producer successfully or not. if (createProducerCounter.incrementAndGet() > failTimes) { - return orginalProducer; + return originalProducer; } log.info("Retry create replicator.producer count: {}", createProducerCounter); // Release producer and fail callback. - orginalProducer.closeAsync(); + originalProducer.closeAsync(); throw new RuntimeException("mock error"); } - return orginalProducer; + return originalProducer; }); // Create topic. From febf6f0e868e09d598010949c0da38fee895fc0f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 1 Feb 2024 12:31:33 +0800 Subject: [PATCH 12/22] add a method disconnect to calling close producer --- .../broker/service/AbstractReplicator.java | 50 +++++++++---------- .../pulsar/broker/service/BrokerService.java | 2 +- .../pulsar/broker/service/Replicator.java | 2 +- .../persistent/PersistentReplicator.java | 24 --------- .../service/persistent/PersistentTopic.java | 4 +- .../service/AbstractReplicatorTest.java | 18 ++++++- .../broker/service/PersistentTopicTest.java | 4 +- .../pulsar/broker/service/ReplicatorTest.java | 2 +- 8 files changed, 47 insertions(+), 59 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 9765d45a14f51..39b55314890fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -41,7 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractReplicator { +public abstract class AbstractReplicator implements Replicator { protected final BrokerService brokerService; protected final String localTopicName; @@ -120,7 +120,7 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl protected abstract Position getReplicatorReadPosition(); - protected abstract long getNumberOfEntriesInBacklog(); + public abstract long getNumberOfEntriesInBacklog(); protected abstract void disableReplicatorRead(); @@ -222,6 +222,25 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } + public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { + if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { + CompletableFuture disconnectFuture = new CompletableFuture<>(); + disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); + if (log.isDebugEnabled()) { + log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); + } + return disconnectFuture; + } + + log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, + getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); + if (!tryChangeStatusToTerminating()) { + // The replicator has been called "terminate" before, just return success. + return CompletableFuture.completedFuture(null); + } + return closeProducerAsync(); + } + protected synchronized CompletableFuture closeProducerAsync() { if (producer == null) { tryChangeStatusToStopped(); @@ -246,7 +265,7 @@ protected synchronized CompletableFuture closeProducerAsync() { }); } - protected synchronized CompletableFuture terminateInternal() { + public synchronized CompletableFuture terminate() { if (producer == null) { STATE_UPDATER.set(this, State.Terminated); return CompletableFuture.completedFuture(null); @@ -264,35 +283,12 @@ protected synchronized CompletableFuture terminateInternal() { + " retrying again in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying - brokerService.executor().schedule(() -> terminateInternal(), + brokerService.executor().schedule(() -> terminate(), waitTimeMs, TimeUnit.MILLISECONDS); return null; }); } - public CompletableFuture terminate() { - return terminate(false); - } - - public synchronized CompletableFuture terminate(boolean failIfHasBacklog) { - if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { - CompletableFuture disconnectFuture = new CompletableFuture<>(); - disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); - if (log.isDebugEnabled()) { - log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); - } - return disconnectFuture; - } - - log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, - getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); - if (!tryChangeStatusToTerminating()) { - // The replicator has been called "terminate" before, just return success. - return CompletableFuture.completedFuture(null); - } - return terminateInternal(); - } - protected boolean tryChangeStatusToTerminating() { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Terminating)){ return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 80930901ca6c4..1212399cf5f5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -742,7 +742,7 @@ public CompletableFuture closeAndRemoveReplicationClient(String clusterNam if (ot.isPresent()) { Replicator r = ot.get().getReplicators().get(clusterName); if (r != null && r.isConnected()) { - r.terminate(false).whenComplete((v, e) -> f.complete(null)); + r.terminate().whenComplete((v, e) -> f.complete(null)); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 3be5623c4d70c..4a8cbe0ca2b74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -31,7 +31,7 @@ public interface Replicator { CompletableFuture terminate(); - CompletableFuture terminate(boolean b); + CompletableFuture disconnect(boolean failIfHasBacklog); void updateRates(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 7a2fa3f6d350d..51bd49e83f181 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -674,30 +674,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl< } } - @Override - public CompletableFuture terminate() { - return terminate(false); - } - - @Override - public synchronized CompletableFuture terminate(boolean failIfHasBacklog) { - final CompletableFuture future = new CompletableFuture<>(); - - super.terminate(failIfHasBacklog).thenRun(() -> { - dispatchRateLimiter.ifPresent(DispatchRateLimiter::close); - future.complete(null); - }).exceptionally(ex -> { - Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); - if (!(t instanceof TopicBusyException)) { - log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage()); - } - future.completeExceptionally(t); - return null; - }); - - return future; - } - @Override public boolean isConnected() { ProducerImpl producer = this.producer; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 706cc2fbae6c5..7a52aaaee463a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -840,8 +840,8 @@ public CompletableFuture stopReplProducers() { private synchronized CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate(true))); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate(true))); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true))); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true))); return FutureUtil.waitForAll(closeFutures); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 7bfd89eeda290..5b678dfeeac20 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -139,7 +140,22 @@ protected Position getReplicatorReadPosition() { } @Override - protected long getNumberOfEntriesInBacklog() { + public ReplicatorStatsImpl getStats() { + return null; + } + + @Override + public void updateRates() { + + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public long getNumberOfEntriesInBacklog() { return 0; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 4794a7a5f284e..24eecdabf7538 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1799,8 +1799,8 @@ public void testClosingReplicationProducerTwice() throws Exception { any(), eq(null) ); - replicator.terminate(false); - replicator.terminate(false); + replicator.terminate(); + replicator.terminate(); replicator.startProducer(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 425dd6b91b5e6..82c6ed2160346 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -895,7 +895,7 @@ public void testReplicatorProducerClosing() throws Exception { pulsar2 = null; pulsar3.close(); pulsar3 = null; - replicator.terminate(false); + replicator.terminate(); Thread.sleep(100); Field field = AbstractReplicator.class.getDeclaredField("producer"); field.setAccessible(true); From 5074e7ad81ca7ca0caf1e89c7d3b0adecc0b9d7f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 2 Feb 2024 17:04:01 +0800 Subject: [PATCH 13/22] address comments --- .../broker/service/AbstractReplicator.java | 117 ++++++++++-------- 1 file changed, 64 insertions(+), 53 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 39b55314890fa..979aced3c7a9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -142,7 +142,6 @@ public synchronized void startProducer() { scheduleCheckTopicActiveAndStartProducer(waitTimeMs); return; } - State state = STATE_UPDATER.get(this); if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { if (state == State.Started) { // Already running @@ -150,7 +149,7 @@ public synchronized void startProducer() { log.debug("[{}] Replicator was already running", replicatorId); } } else { - log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); + log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } return; @@ -177,7 +176,9 @@ public synchronized void startProducer() { protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { brokerService.executor().schedule(() -> { - if (state == State.Terminated) { + if (state == State.Terminating || state == State.Terminated) { + log.info("[{}] Skip scheduled to start the producer since the replicator state is : {}", + replicatorId, state); return; } CompletableFuture> topicFuture = brokerService.getTopics().get(localTopicName); @@ -188,16 +189,19 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { topicFuture.thenAccept(optional -> { if (optional.isEmpty()) { // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed.", replicatorId); return; } if (optional.get() != localTopic) { // Topic closed and created a new one, current replicator is outdated. + log.info("[{}] Skip scheduled to start the producer since the topic was closed.", replicatorId); return; } - // TODO check isClosing or Deleting. Replicator replicator = localTopic.getReplicators().get(remoteCluster); if (replicator != AbstractReplicator.this) { // Current replicator has been closed, and created a new one. + log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current" + + " one.", replicatorId); return; } startProducer(); @@ -223,7 +227,8 @@ protected CompletableFuture isLocalTopicActive() { } public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { - if (failIfHasBacklog && getNumberOfEntriesInBacklog() > 0) { + long backlog = getNumberOfEntriesInBacklog(); + if (failIfHasBacklog && backlog > 0) { CompletableFuture disconnectFuture = new CompletableFuture<>(); disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); if (log.isDebugEnabled()) { @@ -231,62 +236,68 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) } return disconnectFuture; } - log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, - getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); - if (!tryChangeStatusToTerminating()) { - // The replicator has been called "terminate" before, just return success. - return CompletableFuture.completedFuture(null); - } + getReplicatorReadPosition(), backlog); return closeProducerAsync(); } - protected synchronized CompletableFuture closeProducerAsync() { - if (producer == null) { - tryChangeStatusToStopped(); - return CompletableFuture.completedFuture(null); + protected CompletableFuture closeProducerAsync() { + if (!tryChangeStatusToStopping()) { + log.info("[{}] Skip current termination since other thread is doing close producer or termination," + + " state : {}", replicatorId, state); + } + synchronized (this) { + if (producer == null) { + tryChangeStatusToStopped(); + return CompletableFuture.completedFuture(null); + } + CompletableFuture future = producer.closeAsync(); + return future.thenRun(() -> { + tryChangeStatusToStopped(); + this.producer = null; + // deactivate further read + disableReplicatorRead(); + }).exceptionally(ex -> { + long waitTimeMs = backOff.next(); + log.warn( + "[{}] Exception: '{}' occurred while trying to close the producer." + + " retrying again in {} s", + replicatorId, ex.getMessage(), waitTimeMs / 1000.0); + // BackOff before retrying + brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); + return null; + }); } - tryChangeStatusToStopping(); - CompletableFuture future = producer.closeAsync(); - return future.thenRun(() -> { - tryChangeStatusToStopped(); - this.producer = null; - // deactivate further read - disableReplicatorRead(); - }).exceptionally(ex -> { - long waitTimeMs = backOff.next(); - log.warn( - "[{}] Exception: '{}' occurred while trying to close the producer." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); - return null; - }); } - public synchronized CompletableFuture terminate() { - if (producer == null) { - STATE_UPDATER.set(this, State.Terminated); - return CompletableFuture.completedFuture(null); + public CompletableFuture terminate() { + if (!tryChangeStatusToTerminating()) { + log.info("[{}] Skip current termination since other thread is doing termination, state : {}", replicatorId, + state); + } + synchronized (this) { + if (producer == null) { + STATE_UPDATER.set(this, State.Terminated); + return CompletableFuture.completedFuture(null); + } + CompletableFuture future = producer.closeAsync(); + return future.thenRun(() -> { + STATE_UPDATER.set(this, State.Terminated); + this.producer = null; + // set the cursor as inactive. + disableReplicatorRead(); + }).exceptionally(ex -> { + long waitTimeMs = backOff.next(); + log.warn( + "[{}] Exception: '{}' occurred while trying to terminate the replicator." + + " retrying again in {} s", + replicatorId, ex.getMessage(), waitTimeMs / 1000.0); + // BackOff before retrying + brokerService.executor().schedule(() -> terminate(), + waitTimeMs, TimeUnit.MILLISECONDS); + return null; + }); } - CompletableFuture future = producer.closeAsync(); - return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Terminated); - this.producer = null; - // set the cursor as inactive. - disableReplicatorRead(); - }).exceptionally(ex -> { - long waitTimeMs = backOff.next(); - log.warn( - "[{}] Exception: '{}' occurred while trying to terminate the replicator." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(() -> terminate(), - waitTimeMs, TimeUnit.MILLISECONDS); - return null; - }); } protected boolean tryChangeStatusToTerminating() { From 6117922d8205237db0de28f1b35ee1d4715de947 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 2 Feb 2024 17:08:35 +0800 Subject: [PATCH 14/22] remove unnecessary imports --- .../pulsar/broker/service/persistent/PersistentReplicator.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 51bd49e83f181..78474a38ffe74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -46,7 +45,6 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; From 1ddafe40d5491cb424a5ab5fc5f245f8a69dd5ad Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 3 Feb 2024 07:56:00 +0800 Subject: [PATCH 15/22] Improve the lock and state changes --- .../broker/service/AbstractReplicator.java | 277 +++++++++++------- .../pulsar/broker/service/Replicator.java | 2 +- .../NonPersistentReplicator.java | 4 +- .../persistent/PersistentReplicator.java | 70 +++-- .../service/persistent/PersistentTopic.java | 4 +- .../service/AbstractReplicatorTest.java | 2 +- 6 files changed, 231 insertions(+), 128 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 979aced3c7a9f..c58b3efee949f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -26,10 +26,14 @@ import lombok.Getter; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.Backoff; @@ -68,21 +72,33 @@ public abstract class AbstractReplicator implements Replicator { AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); @VisibleForTesting @Getter - private volatile State state = State.Stopped; + protected volatile State state = State.Stopped; public enum State { + /** + * This enum has two mean meanings:Init, Stopped. + * Regarding the meaning "Stopped", only {@link PersistentTopic#checkGC} will call {@link #disconnect}, + * so this method only be used by {@link PersistentTopic#checkGC} now. + * TODO After improving the method {@link #disconnect)}, we should rename "Stopped" to "Init". + */ // The internal producer is stopped. Stopped, // Trying to create a new internal producer. Starting, // The internal producer has started, and tries copy data. Started, + /** + * @Deprecated Only {@link PersistentTopic#checkGC} will call {@link #disconnect}, so this method only be + * used by {@link PersistentTopic#checkGC} now. + * TODO After improving the method {@link #disconnect)}, this enum should be removed. + */ + @Deprecated // The internal producer is trying to stop. Stopping, // The replicator is in terminating. Terminating, // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. - Terminated + Terminated; } public AbstractReplicator(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName, @@ -116,7 +132,7 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl protected abstract String getProducerName(); - protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); + protected abstract void setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer producer); protected abstract Position getReplicatorReadPosition(); @@ -128,50 +144,72 @@ public String getRemoteCluster() { return remoteCluster; } - // This method needs to be synchronized with disconnects else if there is a disconnect followed by startProducer - // the end result can be disconnect. - public synchronized void startProducer() { - if (STATE_UPDATER.get(this) == State.Stopping) { - long waitTimeMs = backOff.next(); - if (log.isDebugEnabled()) { - log.debug( - "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", - replicatorId, waitTimeMs / 1000.0); - } - // BackOff before retrying - scheduleCheckTopicActiveAndStartProducer(waitTimeMs); - return; - } - if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) { - if (state == State.Started) { - // Already running + public void startProducer() { + // Guarantee only one task call "producerBuilder.createAsync()". + Pair setStartingRes = compareSetAndGetState(State.Stopped, State.Starting); + if (setStartingRes.getLeft()) { + if (setStartingRes.getRight() == State.Starting) { + log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", + replicatorId, state); + } else if (setStartingRes.getRight() == State.Started) { + // Since the method "startProducer" will be called even if it is started, only print debug-level log. if (log.isDebugEnabled()) { - log.debug("[{}] Replicator was already running", replicatorId); + log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); } + } else if (setStartingRes.getRight() == State.Stopping) { + if (log.isDebugEnabled()) { + log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." + + " state: {}", replicatorId, state); + } + delayStartProducerAfterStopped(); } else { + /** {@link State.Terminating}, {@link State.Terminated}. **/ log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); } - return; } log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { - readEntries(producer); + setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + Pair setStoppedRes = compareSetAndGetState(State.Starting, State.Stopped); + if (setStoppedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { - log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, - STATE_UPDATER.get(this), ex); + if (setStoppedRes.getRight() == State.Terminating + || setStoppedRes.getRight() == State.Terminated) { + log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", + replicatorId, state); + } else { + /** {@link State.Stopped}, {@link State.Starting}, {@link State.Started} **/ + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other thread will try to create the producer again. so skipped current one task." + + " State is : {}", + replicatorId, state); + } } return null; }); + } + /*** + * The producer is stopping, delay to start the producer. + * If we start a producer immediately, we will get a conflict producer(same name producer) registered error. + */ + protected void delayStartProducerAfterStopped() { + long waitTimeMs = backOff.next(); + if (log.isDebugEnabled()) { + log.debug( + "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", + replicatorId, waitTimeMs / 1000.0); + } + scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { @@ -184,30 +222,40 @@ protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) { CompletableFuture> topicFuture = brokerService.getTopics().get(localTopicName); if (topicFuture == null) { // Topic closed. + log.info("[{}] Skip scheduled to start the producer since the topic was closed successfully." + + " And trigger a terminate.", replicatorId); + terminate(); return; } topicFuture.thenAccept(optional -> { if (optional.isEmpty()) { // Topic closed. - log.info("[{}] Skip scheduled to start the producer since the topic was closed.", replicatorId); + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); return; } if (optional.get() != localTopic) { // Topic closed and created a new one, current replicator is outdated. - log.info("[{}] Skip scheduled to start the producer since the topic was closed.", replicatorId); + log.info("[{}] Skip scheduled to start the producer since the topic was closed. And trigger a" + + " terminate.", replicatorId); + terminate(); return; } Replicator replicator = localTopic.getReplicators().get(remoteCluster); if (replicator != AbstractReplicator.this) { // Current replicator has been closed, and created a new one. log.info("[{}] Skip scheduled to start the producer since a new replicator has instead current" - + " one.", replicatorId); + + " one. And trigger a terminate.", replicatorId); + terminate(); return; } startProducer(); }).exceptionally(ex -> { - log.warn("[{}] [{}] Stop retry to create producer due to unknown error. Replicator state: {}", + log.warn("[{}] [{}] Stop retry to create producer due to unknown error(topic create failed), and" + + " trigger a terminate. Replicator state: {}", localTopicName, replicatorId, STATE_UPDATER.get(this), ex); + terminate(); return null; }); }, waitTimeMs, TimeUnit.MILLISECONDS); @@ -226,7 +274,13 @@ protected CompletableFuture isLocalTopicActive() { }, brokerService.executor()); } - public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) { + /** + * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a + * "cursor.readComplete" execute concurrently. + */ + @Deprecated + public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { long backlog = getNumberOfEntriesInBacklog(); if (failIfHasBacklog && backlog > 0) { CompletableFuture disconnectFuture = new CompletableFuture<>(); @@ -238,66 +292,100 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) } log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, getReplicatorReadPosition(), backlog); - return closeProducerAsync(); + return closeProducerAsync(closeTheStartingProducer); } - protected CompletableFuture closeProducerAsync() { - if (!tryChangeStatusToStopping()) { + /** + * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a + * "cursor.readComplete" execute concurrently. + */ + @Deprecated + protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { + Pair setStoppingRes = compareSetAndGetState(State.Started, State.Stopping); + if (!setStoppingRes.getLeft()) { + if (setStoppingRes.getRight() == State.Starting) { + if (closeTheStartingProducer) { + /** + * Delay retry(wait for the start producer task is finish). + * Note: If the producer always start fail, the start producer task will always retry. + * Then current task may always retry. But if the state is {@link State.Stopped} next retry will + * be skipped. The better solution is creating a {@link CompletableFuture} to trace the creation. + */ + long waitTimeMs = backOff.next(); + brokerService.executor().schedule(() -> closeProducerAsync(true), + waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.info("[{}] Skip current producer closing since the previous producer has been closed," + + " and trying start a new one, state : {}", + replicatorId, setStoppingRes.getRight()); + } + } else if (setStoppingRes.getRight() == State.Stopped + || setStoppingRes.getRight() == State.Stopping) { + log.info("[{}] Skip current producer closing since other thread did closing, state : {}", + replicatorId, setStoppingRes.getRight()); + } else if (setStoppingRes.getRight() == State.Terminating + || setStoppingRes.getRight() == State.Terminated) { + log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", + replicatorId, state); + } log.info("[{}] Skip current termination since other thread is doing close producer or termination," + " state : {}", replicatorId, state); + return CompletableFuture.completedFuture(null); } - synchronized (this) { - if (producer == null) { - tryChangeStatusToStopped(); - return CompletableFuture.completedFuture(null); - } - CompletableFuture future = producer.closeAsync(); - return future.thenRun(() -> { - tryChangeStatusToStopped(); + + // Close producer and update state. + return doCloseProducerAsync(producer, () -> { + Pair setStoppedRes = compareSetAndGetState(State.Stopping, State.Stopped); + if (setStoppedRes.getLeft()) { this.producer = null; // deactivate further read disableReplicatorRead(); - }).exceptionally(ex -> { - long waitTimeMs = backOff.next(); - log.warn( - "[{}] Exception: '{}' occurred while trying to close the producer." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); - return null; - }); - } + return; + } + if (setStoppedRes.getRight() == State.Terminating || setStoppingRes.getRight() == State.Terminated) { + log.info("[{}] Skip setting state to stopped because it was terminated, state : {}", + replicatorId, state); + } else { + // Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Other task has change the state to stopped. so skipped current one task." + + " State is : {}", + replicatorId, state); + } + }); + } + + protected CompletableFuture doCloseProducerAsync(Producer producer, Runnable actionAfterClosed) { + CompletableFuture future = + producer == null ? CompletableFuture.completedFuture(null) : producer.closeAsync(); + return future.thenRun(() -> { + actionAfterClosed.run(); + }).exceptionally(ex -> { + long waitTimeMs = backOff.next(); + log.warn( + "[{}] Exception: '{}' occurred while trying to close the producer. Replicator state: {}." + + " Retrying again in {} s.", + replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0); + // BackOff before retrying + brokerService.executor().schedule(() -> doCloseProducerAsync(producer, actionAfterClosed), + waitTimeMs, TimeUnit.MILLISECONDS); + return null; + }); } public CompletableFuture terminate() { if (!tryChangeStatusToTerminating()) { log.info("[{}] Skip current termination since other thread is doing termination, state : {}", replicatorId, state); + return CompletableFuture.completedFuture(null); } - synchronized (this) { - if (producer == null) { - STATE_UPDATER.set(this, State.Terminated); - return CompletableFuture.completedFuture(null); - } - CompletableFuture future = producer.closeAsync(); - return future.thenRun(() -> { - STATE_UPDATER.set(this, State.Terminated); - this.producer = null; - // set the cursor as inactive. - disableReplicatorRead(); - }).exceptionally(ex -> { - long waitTimeMs = backOff.next(); - log.warn( - "[{}] Exception: '{}' occurred while trying to terminate the replicator." - + " retrying again in {} s", - replicatorId, ex.getMessage(), waitTimeMs / 1000.0); - // BackOff before retrying - brokerService.executor().schedule(() -> terminate(), - waitTimeMs, TimeUnit.MILLISECONDS); - return null; - }); - } + return doCloseProducerAsync(producer, () -> { + STATE_UPDATER.set(this, State.Terminated); + this.producer = null; + // set the cursor as inactive. + disableReplicatorRead(); + }); } protected boolean tryChangeStatusToTerminating() { @@ -316,29 +404,6 @@ protected boolean tryChangeStatusToTerminating() { return false; } - protected boolean tryChangeStatusToStopping() { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)){ - return true; - } - if (STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)){ - return true; - } - return false; - } - - protected boolean tryChangeStatusToStopped() { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)){ - return true; - } - if (STATE_UPDATER.compareAndSet(this, State.Started, State.Stopped)){ - return true; - } - if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Stopped)){ - return true; - } - return false; - } - public CompletableFuture remove() { // No-op return CompletableFuture.completedFuture(null); @@ -397,4 +462,18 @@ public static CompletableFuture validatePartitionedTopicAsync(String topic public State getState() { return state; } + + protected ImmutablePair compareSetAndGetState(State expect, State update) { + State original1 = state; + if (STATE_UPDATER.compareAndSet(this, expect, update)) { + return ImmutablePair.of(true, expect); + } + State original2 = state; + // Maybe the value changed more than once even if "original1 == original2", but the probability is very small, + // so let's ignore this case for prevent using a lock. + if (original1 == original2) { + return ImmutablePair.of(false, original1); + } + return compareSetAndGetState(expect, update); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java index 4a8cbe0ca2b74..8130b855b4e4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Replicator.java @@ -31,7 +31,7 @@ public interface Replicator { CompletableFuture terminate(); - CompletableFuture disconnect(boolean failIfHasBacklog); + CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer); void updateRates(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 552d75d460ab8..51509f3818a28 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -67,7 +67,7 @@ protected String getProducerName() { } @Override - protected void readEntries(Producer producer) { + protected void setProducerAndTriggerReadEntries(Producer producer) { this.producer = (ProducerImpl) producer; if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { @@ -78,7 +78,7 @@ protected void readEntries(Producer producer) { "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); - closeProducerAsync(); + doCloseProducerAsync(producer, () -> {}); return; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 78474a38ffe74..507797adc49ce 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.broker.service.persistent; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated; +import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; @@ -42,6 +46,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; @@ -132,29 +137,48 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man } @Override - protected void readEntries(Producer producer) { - // Rewind the cursor to be sure to read again all non-acked messages sent while restarting + protected void setProducerAndTriggerReadEntries(Producer producer) { + // Rewind the cursor to be sure to read again all non-acked messages sent while restarting. cursor.rewind(); - cursor.cancelPendingReadRequest(); - HAVE_PENDING_READ_UPDATER.set(this, FALSE); - this.producer = (ProducerImpl) producer; - - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { - log.info("[{}] Created replicator producer", replicatorId); - backOff.reset(); - // activate cursor: so, entries can be cached - this.cursor.setActive(); - // read entries - readMoreEntries(); - } else { - log.info( - "[{}] Replicator was stopped while creating the producer." - + " Closing it. Replicator state: {}", - replicatorId, STATE_UPDATER.get(this)); - closeProducerAsync(); + + /** + * 1. Try change state to {@link Started}. + * 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value + * producer when the state is {@link Started}. + */ + Pair changeStateRes; + changeStateRes = compareSetAndGetState(Starting, Started); + if (changeStateRes.getLeft()) { + this.producer = (ProducerImpl) producer; + HAVE_PENDING_READ_UPDATER.set(this, FALSE); + } + + // Close the producer if change the state fail. + if (!changeStateRes.getLeft()) { + if (changeStateRes.getRight() == Started) { + // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. + // So print a warn log. + log.warn("[{}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", + replicatorId, state); + } else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { + log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", + replicatorId, state); + } else { + log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}", + replicatorId, changeStateRes.getRight()); + } + doCloseProducerAsync(producer, () -> {}); + return; } + // Trigger a new read. + log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); + backOff.reset(); + // activate cursor: so, entries can be cached. + this.cursor.setActive(); + // read entries + readMoreEntries(); } @Override @@ -417,7 +441,7 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - if (STATE_UPDATER.get(this) != State.Started) { + if (state != Started) { log.info("[{}] Replicator was stopped while reading entries." + " Stop reading. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); @@ -434,7 +458,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be stopped. - closeProducerAsync(); + terminate(); return; } else if (!(exception instanceof TooManyRequestsException)) { log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", @@ -448,7 +472,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } HAVE_PENDING_READ_UPDATER.set(this, FALSE); - brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(() -> readMoreEntries(), waitTimeMillis, TimeUnit.MILLISECONDS); } public CompletableFuture clearBacklog() { @@ -553,7 +577,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be stopped. - closeProducerAsync(); + terminate(); return; } if (ctx instanceof PositionImpl) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 7a52aaaee463a..0506b250d3e3a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -840,8 +840,8 @@ public CompletableFuture stopReplProducers() { private synchronized CompletableFuture closeReplProducersIfNoBacklog() { List> closeFutures = new ArrayList<>(); - replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true))); - shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true))); + replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true, true))); + shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true, true))); return FutureUtil.waitForAll(closeFutures); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 5b678dfeeac20..7aebf20896c2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -130,7 +130,7 @@ protected String getProducerName() { } @Override - protected void readEntries(Producer producer) { + protected void setProducerAndTriggerReadEntries(Producer producer) { } From 513b1f6e709f1cc3c42dae0cac4726b00f381804 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 3 Feb 2024 17:23:58 +0800 Subject: [PATCH 16/22] fix bug --- .../org/apache/pulsar/broker/service/AbstractReplicator.java | 2 +- .../org/apache/pulsar/broker/service/PersistentTopicTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index c58b3efee949f..79a1ab10ba8c9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -147,7 +147,7 @@ public String getRemoteCluster() { public void startProducer() { // Guarantee only one task call "producerBuilder.createAsync()". Pair setStartingRes = compareSetAndGetState(State.Stopped, State.Starting); - if (setStartingRes.getLeft()) { + if (!setStartingRes.getLeft()) { if (setStartingRes.getRight() == State.Starting) { log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", replicatorId, state); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 24eecdabf7538..de9d0272fc002 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -1804,7 +1804,7 @@ public void testClosingReplicationProducerTwice() throws Exception { replicator.startProducer(); - verify(clientImpl, Mockito.times(2)).createProducerAsync(any(), any(), any()); + verify(clientImpl, Mockito.times(1)).createProducerAsync(any(), any(), any()); } @Test From f908292afb121ab33d0698709e20bd9cdc9859e7 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 23 Mar 2024 14:27:40 +0800 Subject: [PATCH 17/22] address comments --- .../broker/service/AbstractReplicator.java | 6 ++--- .../persistent/PersistentReplicator.java | 24 ++++++++----------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 79a1ab10ba8c9..ea9918e5ba096 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -275,11 +275,10 @@ protected CompletableFuture isLocalTopicActive() { } /** - * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * This method only be used by {@link PersistentTopic#checkGC} now. * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a * "cursor.readComplete" execute concurrently. */ - @Deprecated public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { long backlog = getNumberOfEntriesInBacklog(); if (failIfHasBacklog && backlog > 0) { @@ -296,11 +295,10 @@ public CompletableFuture disconnect(boolean failIfHasBacklog, boolean clos } /** - * @Deprecated This method only be used by {@link PersistentTopic#checkGC} now. + * This method only be used by {@link PersistentTopic#checkGC} now. * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a * "cursor.readComplete" execute concurrently. */ - @Deprecated protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { Pair setStoppingRes = compareSetAndGetState(State.Started, State.Stopping); if (!setStoppingRes.getLeft()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 507797adc49ce..f479c7b2baefa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -152,10 +152,14 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { if (changeStateRes.getLeft()) { this.producer = (ProducerImpl) producer; HAVE_PENDING_READ_UPDATER.set(this, FALSE); - } - - // Close the producer if change the state fail. - if (!changeStateRes.getLeft()) { + // Trigger a new read. + log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); + backOff.reset(); + // activate cursor: so, entries can be cached. + this.cursor.setActive(); + // read entries + readMoreEntries(); + } else { if (changeStateRes.getRight() == Started) { // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. // So print a warn log. @@ -168,17 +172,9 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}", replicatorId, changeStateRes.getRight()); } + // Close the producer if change the state fail. doCloseProducerAsync(producer, () -> {}); - return; } - - // Trigger a new read. - log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state); - backOff.reset(); - // activate cursor: so, entries can be cached. - this.cursor.setActive(); - // read entries - readMoreEntries(); } @Override @@ -472,7 +468,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } HAVE_PENDING_READ_UPDATER.set(this, FALSE); - brokerService.executor().schedule(() -> readMoreEntries(), waitTimeMillis, TimeUnit.MILLISECONDS); + brokerService.executor().schedule(this::readMoreEntries, waitTimeMillis, TimeUnit.MILLISECONDS); } public CompletableFuture clearBacklog() { From 877452da5bffab6af04c9fa3976788682c4eb76f Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 23 Mar 2024 14:45:23 +0800 Subject: [PATCH 18/22] rename: stop -> disconnect --- .../broker/service/AbstractReplicator.java | 83 +++++++++---------- .../persistent/PersistentReplicator.java | 10 +-- 2 files changed, 45 insertions(+), 48 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index ea9918e5ba096..e9afec6c2feda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -72,29 +72,25 @@ public abstract class AbstractReplicator implements Replicator { AtomicReferenceFieldUpdater.newUpdater(AbstractReplicator.class, State.class, "state"); @VisibleForTesting @Getter - protected volatile State state = State.Stopped; + protected volatile State state = State.Disconnected; public enum State { /** - * This enum has two mean meanings:Init, Stopped. - * Regarding the meaning "Stopped", only {@link PersistentTopic#checkGC} will call {@link #disconnect}, - * so this method only be used by {@link PersistentTopic#checkGC} now. - * TODO After improving the method {@link #disconnect)}, we should rename "Stopped" to "Init". + * This enum has two mean meanings: + * Init: replicator is just created, has not been started now. + * Disconnected: the producer was closed after {@link PersistentTopic#checkGC} called {@link #disconnect}. */ - // The internal producer is stopped. - Stopped, + // The internal producer is disconnected. + Disconnected, // Trying to create a new internal producer. Starting, // The internal producer has started, and tries copy data. Started, /** - * @Deprecated Only {@link PersistentTopic#checkGC} will call {@link #disconnect}, so this method only be - * used by {@link PersistentTopic#checkGC} now. - * TODO After improving the method {@link #disconnect)}, this enum should be removed. + * The producer is closing after {@link PersistentTopic#checkGC} called {@link #disconnect}. */ - @Deprecated - // The internal producer is trying to stop. - Stopping, + // The internal producer is trying to disconnect. + Disconnecting, // The replicator is in terminating. Terminating, // The replicator is never used again. Pulsar will create a new Replicator when enable replication again. @@ -127,7 +123,7 @@ public AbstractReplicator(String localCluster, Topic localTopic, String remoteCl .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // .producerName(getProducerName()); - STATE_UPDATER.set(this, State.Stopped); + STATE_UPDATER.set(this, State.Disconnected); } protected abstract String getProducerName(); @@ -146,7 +142,7 @@ public String getRemoteCluster() { public void startProducer() { // Guarantee only one task call "producerBuilder.createAsync()". - Pair setStartingRes = compareSetAndGetState(State.Stopped, State.Starting); + Pair setStartingRes = compareSetAndGetState(State.Disconnected, State.Starting); if (!setStartingRes.getLeft()) { if (setStartingRes.getRight() == State.Starting) { log.info("[{}] Skip the producer creation since other thread is doing starting, state : {}", @@ -156,12 +152,12 @@ public void startProducer() { if (log.isDebugEnabled()) { log.debug("[{}] Replicator was already running. state: {}", replicatorId, state); } - } else if (setStartingRes.getRight() == State.Stopping) { + } else if (setStartingRes.getRight() == State.Disconnecting) { if (log.isDebugEnabled()) { log.debug("[{}] Rep.producer is closing, delay to retry(wait the producer close success)." + " state: {}", replicatorId, state); } - delayStartProducerAfterStopped(); + delayStartProducerAfterDisconnected(); } else { /** {@link State.Terminating}, {@link State.Terminated}. **/ log.info("[{}] Skip the producer creation since the replicator state is : {}", replicatorId, state); @@ -173,20 +169,20 @@ public void startProducer() { producerBuilder.createAsync().thenAccept(producer -> { setProducerAndTriggerReadEntries(producer); }).exceptionally(ex -> { - Pair setStoppedRes = compareSetAndGetState(State.Starting, State.Stopped); - if (setStoppedRes.getLeft()) { + Pair setDisconnectedRes = compareSetAndGetState(State.Starting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { long waitTimeMs = backOff.next(); log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying scheduleCheckTopicActiveAndStartProducer(waitTimeMs); } else { - if (setStoppedRes.getRight() == State.Terminating - || setStoppedRes.getRight() == State.Terminated) { + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectedRes.getRight() == State.Terminated) { log.info("[{}] Skip to create producer, because it has been terminated, state is : {}", replicatorId, state); } else { - /** {@link State.Stopped}, {@link State.Starting}, {@link State.Started} **/ + /** {@link State.Disconnected}, {@link State.Starting}, {@link State.Started} **/ // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. // So print a warn log. log.warn("[{}] Other thread will try to create the producer again. so skipped current one task." @@ -199,10 +195,10 @@ public void startProducer() { } /*** - * The producer is stopping, delay to start the producer. + * The producer is disconnecting, delay to start the producer. * If we start a producer immediately, we will get a conflict producer(same name producer) registered error. */ - protected void delayStartProducerAfterStopped() { + protected void delayStartProducerAfterDisconnected() { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { log.debug( @@ -300,15 +296,16 @@ public CompletableFuture disconnect(boolean failIfHasBacklog, boolean clos * "cursor.readComplete" execute concurrently. */ protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { - Pair setStoppingRes = compareSetAndGetState(State.Started, State.Stopping); - if (!setStoppingRes.getLeft()) { - if (setStoppingRes.getRight() == State.Starting) { + Pair setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); + if (!setDisconnectingRes.getLeft()) { + if (setDisconnectingRes.getRight() == State.Starting) { if (closeTheStartingProducer) { /** * Delay retry(wait for the start producer task is finish). - * Note: If the producer always start fail, the start producer task will always retry. - * Then current task may always retry. But if the state is {@link State.Stopped} next retry will - * be skipped. The better solution is creating a {@link CompletableFuture} to trace the creation. + * Note: If the producer always start fail, the start producer task will always retry until the + * state changed to {@link State.Terminated}. + * TODO The better solution is creating a {@link CompletableFuture} to trace the in-progress + * creation and call "inProgressCreationFuture.thenApply(closeProducer())". */ long waitTimeMs = backOff.next(); brokerService.executor().schedule(() -> closeProducerAsync(true), @@ -316,14 +313,14 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro } else { log.info("[{}] Skip current producer closing since the previous producer has been closed," + " and trying start a new one, state : {}", - replicatorId, setStoppingRes.getRight()); + replicatorId, setDisconnectingRes.getRight()); } - } else if (setStoppingRes.getRight() == State.Stopped - || setStoppingRes.getRight() == State.Stopping) { + } else if (setDisconnectingRes.getRight() == State.Disconnected + || setDisconnectingRes.getRight() == State.Disconnecting) { log.info("[{}] Skip current producer closing since other thread did closing, state : {}", - replicatorId, setStoppingRes.getRight()); - } else if (setStoppingRes.getRight() == State.Terminating - || setStoppingRes.getRight() == State.Terminated) { + replicatorId, setDisconnectingRes.getRight()); + } else if (setDisconnectingRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { log.info("[{}] Skip current producer closing since other thread is doing termination, state : {}", replicatorId, state); } @@ -334,20 +331,20 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro // Close producer and update state. return doCloseProducerAsync(producer, () -> { - Pair setStoppedRes = compareSetAndGetState(State.Stopping, State.Stopped); - if (setStoppedRes.getLeft()) { + Pair setDisconnectedRes = compareSetAndGetState(State.Disconnecting, State.Disconnected); + if (setDisconnectedRes.getLeft()) { this.producer = null; // deactivate further read disableReplicatorRead(); return; } - if (setStoppedRes.getRight() == State.Terminating || setStoppingRes.getRight() == State.Terminated) { - log.info("[{}] Skip setting state to stopped because it was terminated, state : {}", + if (setDisconnectedRes.getRight() == State.Terminating || setDisconnectingRes.getRight() == State.Terminated) { + log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", replicatorId, state); } else { // Since only one task can call "doCloseProducerAsync(producer, action)", this scenario is not expected. // So print a warn log. - log.warn("[{}] Other task has change the state to stopped. so skipped current one task." + log.warn("[{}] Other task has change the state to terminated. so skipped current one task." + " State is : {}", replicatorId, state); } @@ -393,10 +390,10 @@ protected boolean tryChangeStatusToTerminating() { if (STATE_UPDATER.compareAndSet(this, State.Started, State.Terminating)){ return true; } - if (STATE_UPDATER.compareAndSet(this, State.Stopping, State.Terminating)){ + if (STATE_UPDATER.compareAndSet(this, State.Disconnecting, State.Terminating)){ return true; } - if (STATE_UPDATER.compareAndSet(this, State.Stopped, State.Terminating)) { + if (STATE_UPDATER.compareAndSet(this, State.Disconnected, State.Terminating)) { return true; } return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index f479c7b2baefa..5e1cc4a936a75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -163,8 +163,8 @@ protected void setProducerAndTriggerReadEntries(Producer producer) { if (changeStateRes.getRight() == Started) { // Since only one task can call "producerBuilder.createAsync()", this scenario is not expected. // So print a warn log. - log.warn("[{}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", - replicatorId, state); + log.warn("[{}] Replicator was already started by another thread while creating the producer." + + " Closing the producer newly created. Replicator state: {}", replicatorId, state); } else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) { log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}", replicatorId, state); @@ -438,7 +438,7 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { if (state != Started) { - log.info("[{}] Replicator was stopped while reading entries." + log.info("[{}] Replicator was disconnected while reading entries." + " Stop reading. Replicator state: {}", replicatorId, STATE_UPDATER.get(this)); return; @@ -453,7 +453,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped. + // replicator is already deleted and cursor is already closed so, producer should also be disconnected. terminate(); return; } else if (!(exception instanceof TooManyRequestsException)) { @@ -572,7 +572,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { if (exception instanceof CursorAlreadyClosedException) { log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already" + " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception); - // replicator is already deleted and cursor is already closed so, producer should also be stopped. + // replicator is already deleted and cursor is already closed so, producer should also be disconnected. terminate(); return; } From a47447156c397596d8ef0ada4eeb81388f88f5d1 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 23 Mar 2024 14:46:12 +0800 Subject: [PATCH 19/22] code formate --- .../org/apache/pulsar/broker/service/AbstractReplicator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index e9afec6c2feda..d54805f817eff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -338,7 +338,8 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro disableReplicatorRead(); return; } - if (setDisconnectedRes.getRight() == State.Terminating || setDisconnectingRes.getRight() == State.Terminated) { + if (setDisconnectedRes.getRight() == State.Terminating + || setDisconnectingRes.getRight() == State.Terminated) { log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", replicatorId, state); } else { From 0675bdaa20541cab6ad673a7d16ac48e9eeed174 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 23 Mar 2024 17:32:14 +0800 Subject: [PATCH 20/22] add a TODO comment --- .../apache/pulsar/broker/service/AbstractReplicator.java | 6 +----- .../broker/service/persistent/PersistentTopic.java | 9 +++++++++ 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index d54805f817eff..f34144deb0ab0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -272,8 +272,6 @@ protected CompletableFuture isLocalTopicActive() { /** * This method only be used by {@link PersistentTopic#checkGC} now. - * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a - * "cursor.readComplete" execute concurrently. */ public CompletableFuture disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer) { long backlog = getNumberOfEntriesInBacklog(); @@ -292,8 +290,6 @@ public CompletableFuture disconnect(boolean failIfHasBacklog, boolean clos /** * This method only be used by {@link PersistentTopic#checkGC} now. - * TODO "PersistentReplicator.replicateEntries" may get a NullPointerException if this method and a - * "cursor.readComplete" execute concurrently. */ protected CompletableFuture closeProducerAsync(boolean closeTheStartingProducer) { Pair setDisconnectingRes = compareSetAndGetState(State.Started, State.Disconnecting); @@ -304,7 +300,7 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro * Delay retry(wait for the start producer task is finish). * Note: If the producer always start fail, the start producer task will always retry until the * state changed to {@link State.Terminated}. - * TODO The better solution is creating a {@link CompletableFuture} to trace the in-progress + * Nit: The better solution is creating a {@link CompletableFuture} to trace the in-progress * creation and call "inProgressCreationFuture.thenApply(closeProducer())". */ long waitTimeMs = backOff.next(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0506b250d3e3a..cc2712d0ca5d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2948,6 +2948,15 @@ public void checkGC() { log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic, maxInactiveDurationInSec); } + /** + * There is a race condition that may cause a NPE: + * - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication. + * - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable + * "replicator.producer" to a null value. + * Race condition: task 1 will get a NPE when it tries to send messages using the variable + * "replicator.producer", because task 2 will set this variable to "null". + * TODO Create a seperated PR to fix it. + */ closeReplProducersIfNoBacklog().thenRun(() -> { if (hasRemoteProducers()) { if (log.isDebugEnabled()) { From 0390ecfe12002c36995705f69c01105227d82184 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sun, 7 Apr 2024 17:23:52 +0800 Subject: [PATCH 21/22] fix test --- .../org/apache/pulsar/broker/service/ReplicatorTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 82c6ed2160346..a05c3468ea16e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -1834,7 +1834,7 @@ public void testReplicatorWithTTL() throws Exception { persistentTopic.getReplicators().forEach((cluster, replicator) -> { PersistentReplicator persistentReplicator = (PersistentReplicator) replicator; // Pause replicator - persistentReplicator.terminate(); + pauseReplicator(persistentReplicator); }); persistentProducer1.send("V2".getBytes()); @@ -1874,4 +1874,11 @@ public void testReplicatorWithTTL() throws Exception { assertEquals(result, Lists.newArrayList("V1", "V2", "V3", "V4")); } + + private void pauseReplicator(PersistentReplicator replicator) { + Awaitility.await().untilAsserted(() -> { + assertTrue(replicator.isConnected()); + }); + replicator.closeProducerAsync(true); + } } From 5793ca16d0da7a5ea779b5efd45c53f3d07a7be6 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 23 Apr 2024 00:50:34 +0800 Subject: [PATCH 22/22] fix tests --- .../broker/service/OneWayReplicatorTest.java | 77 +++++++++++++------ .../service/OneWayReplicatorTestBase.java | 38 +++++++++ 2 files changed, 92 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 4f051ff90e801..f9184f2288f52 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -21,9 +21,9 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; @@ -33,7 +33,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import lombok.AllArgsConstructor; import lombok.Data; @@ -104,7 +106,7 @@ private ProducerImpl overrideProducerForReplicator(AbstractReplicator replicator return originalValue; } - @Test + @Test(timeOut = 45 * 1000) public void testReplicatorProducerStatInTopic() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); final String subscribeName = "subscribe_1"; @@ -130,7 +132,7 @@ public void testReplicatorProducerStatInTopic() throws Exception { }); } - @Test + @Test(timeOut = 45 * 1000) public void testCreateRemoteConsumerFirst() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); Producer producer1 = client1.newProducer(Schema.STRING).topic(topicName).create(); @@ -150,28 +152,49 @@ public void testCreateRemoteConsumerFirst() throws Exception { }); } - @Test + @Test(timeOut = 45 * 1000) public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); admin1.topics().createNonPartitionedTopic(topicName); // Wait for replicator started. waitReplicatorStarted(topicName); - PersistentTopic persistentTopic = + PersistentTopic topic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get(); - PersistentReplicator replicator = - (PersistentReplicator) persistentTopic.getReplicators().values().iterator().next(); + PersistentReplicator replicator1 = + (PersistentReplicator) topic1.getReplicators().values().iterator().next(); // Mock an error when calling "replicator.disconnect()" - ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); - when(mockProducer.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex"))); - ProducerImpl originalProducer = overrideProducerForReplicator(replicator, mockProducer); + AtomicBoolean closeFailed = new AtomicBoolean(true); + final ProducerImpl mockProducer = Mockito.mock(ProducerImpl.class); + final AtomicReference originalProducer1 = new AtomicReference(); + doAnswer(invocation -> { + if (closeFailed.get()) { + return CompletableFuture.failedFuture(new Exception("mocked ex")); + } else { + return originalProducer1.get().closeAsync(); + } + }).when(mockProducer).closeAsync(); + originalProducer1.set(overrideProducerForReplicator(replicator1, mockProducer)); // Verify: since the "replicator.producer.closeAsync()" will retry after it failed, the topic unload should be // successful. admin1.topics().unload(topicName); // Verify: After "replicator.producer.closeAsync()" retry again, the "replicator.producer" will be closed // successful. - overrideProducerForReplicator(replicator, originalProducer); + closeFailed.set(false); + AtomicReference topic2 = new AtomicReference(); + AtomicReference replicator2 = new AtomicReference(); Awaitility.await().untilAsserted(() -> { - Assert.assertFalse(replicator.isConnected()); + topic2.set((PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get()); + replicator2.set((PersistentReplicator) topic2.get().getReplicators().values().iterator().next()); + // It is a new Topic after reloading. + assertNotEquals(topic2.get(), topic1); + assertNotEquals(replicator2.get(), replicator1); + }); + Awaitility.await().untilAsserted(() -> { + // Old replicator should be closed. + Assert.assertFalse(replicator1.isConnected()); + Assert.assertFalse(originalProducer1.get().isConnected()); + // New replicator should be connected. + Assert.assertTrue(replicator2.get().isConnected()); }); // cleanup. cleanupTopics(() -> { @@ -205,19 +228,26 @@ private void injectMockReplicatorProducerBuilder( // Inject producer decorator. doAnswer(invocation -> { Schema schema = (Schema) invocation.getArguments()[0]; - ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) internalClient.newProducer(schema); + ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) internalClient.newProducer(schema); ProducerBuilder spyProducerBuilder = spy(producerBuilder); doAnswer(ignore -> { CompletableFuture producerFuture = new CompletableFuture<>(); - final ProducerImpl p = (ProducerImpl) producerBuilder.create(); - new FastThreadLocalThread(() -> { - try { - ProducerImpl newProducer = producerDecorator.apply(producerBuilder.getConf(), p); - producerFuture.complete(newProducer); - } catch (Exception ex) { - producerFuture.completeExceptionally(ex); + producerBuilder.createAsync().whenComplete((p, t) -> { + if (t != null) { + producerFuture.completeExceptionally(t); + return; } - }).start(); + ProducerImpl pImpl = (ProducerImpl) p; + new FastThreadLocalThread(() -> { + try { + ProducerImpl newProducer = producerDecorator.apply(producerBuilder.getConf(), pImpl); + producerFuture.complete(newProducer); + } catch (Exception ex) { + producerFuture.completeExceptionally(ex); + } + }).start(); + }); + return producerFuture; }).when(spyProducerBuilder).createAsync(); return spyProducerBuilder; @@ -306,7 +336,7 @@ void startCallback() { * - The topic is wholly closed. * - Verify: the delayed created internal producer will be closed. */ - @Test + @Test(timeOut = 120 * 1000) public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception { final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_"); // Inject an error for "replicator.producer" creation. @@ -347,7 +377,8 @@ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception // Stuck start new producer, until the state of replicator change to Stopped. // The next once of "createProducerSuccessAfterFailTimes" to create producer will be successfully. Awaitility.await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(60)).untilAsserted(() -> { - assertTrue(createProducerCounter.get() >= failTimes); + assertTrue(createProducerCounter.get() >= failTimes, + "count of retry to create producer is " + createProducerCounter.get()); }); CompletableFuture topicCloseFuture = persistentTopic.close(true); Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java index 8a8dd6f948d31..8e8b444f952c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java @@ -18,21 +18,28 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION; import com.google.common.collect.Sets; import java.net.URL; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.tests.TestRetrySupport; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.apache.pulsar.zookeeper.ZookeeperServerTest; +import org.awaitility.Awaitility; +import org.testng.Assert; @Slf4j public abstract class OneWayReplicatorTestBase extends TestRetrySupport { @@ -140,10 +147,32 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception { } protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception { + waitChangeEventsInit(defaultNamespace); admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Collections.singleton(cluster1)); admin1.namespaces().unload(defaultNamespace); cleanupTopicAction.run(); admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2)); + waitChangeEventsInit(defaultNamespace); + } + + protected void waitChangeEventsInit(String namespace) { + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService() + .getTopic(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, false) + .join().get(); + Awaitility.await().atMost(Duration.ofSeconds(180)).untilAsserted(() -> { + TopicStatsImpl topicStats = topic.getStats(true, false, false); + topicStats.getSubscriptions().entrySet().forEach(entry -> { + // No wait for compaction. + if (COMPACTION_SUBSCRIPTION.equals(entry.getKey())) { + return; + } + // No wait for durable cursor. + if (entry.getValue().isDurable()) { + return; + } + Assert.assertTrue(entry.getValue().getMsgBacklog() == 0, entry.getKey()); + }); + }); } protected interface CleanupTopicAction { @@ -185,10 +214,19 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); config.setEnableReplicatedSubscriptions(true); config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + config.setLoadBalancerSheddingEnabled(false); } @Override protected void cleanup() throws Exception { + // delete namespaces. + waitChangeEventsInit(defaultNamespace); + admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1)); + admin1.namespaces().deleteNamespace(defaultNamespace); + admin2.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster2)); + admin2.namespaces().deleteNamespace(defaultNamespace); + + // shutdown. markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---");