From 9c7ca2d3b7c8683c42b9f7592aeeb123a63b911b Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 12 Dec 2022 18:29:52 +0800 Subject: [PATCH 01/12] Fix PendingAckHandleImpl relay failed --- .../persistent/PersistentSubscription.java | 32 +++++ .../service/persistent/PersistentTopic.java | 110 +++++++++++------- 2 files changed, 101 insertions(+), 41 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f88ce03532414..2951b2ab3a7f0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -31,6 +31,8 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -125,6 +127,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; + private final CompletableFuture pendingAckHandleInitFuture; private volatile Map subscriptionProperties; static { @@ -155,11 +158,19 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); + this.pendingAckHandleInitFuture = new CompletableFuture<>(); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !isEventSystemTopic(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); + pendingAckHandle.pendingAckHandleFuture() + .thenAccept(__ -> pendingAckHandleInitFuture.complete(null)) + .exceptionally(t -> { + pendingAckHandleInitFuture.completeExceptionally(t); + return null; + }); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); + pendingAckHandleInitFuture.complete(null); } IS_FENCED_UPDATER.set(this, FALSE); } @@ -890,6 +901,23 @@ public CompletableFuture close() { } } + protected void retryClose() { + ScheduledExecutorService scheduler = topic.getBrokerService().pulsar().getExecutor(); + scheduler.schedule(() -> { + close() + .thenAccept(unused -> { + if (log.isDebugEnabled()) { + log.debug("Close subscription {} finished.", subName); + } + }) + .exceptionally(t -> { + log.warn("Close subscription {} failed.", subName, t); + retryClose(); + return null; + }); + }, 20, TimeUnit.SECONDS); + } + /** * Disconnect all consumers attached to the dispatcher and close this subscription. * @@ -1293,6 +1321,10 @@ public PendingAckHandle getPendingAckHandle() { return pendingAckHandle; } + public CompletableFuture getPendingAckHandleInitFuture() { + return this.pendingAckHandleInitFuture; + } + public void syncBatchPositionBitSetForPendingAck(PositionImpl position) { this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2066ac2ee2988..8708b27740c77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -282,12 +282,22 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS // ignore it for now and let the message dedup logic to take care of it } else { final String subscriptionName = Codec.decode(cursor.getName()); - subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, + PersistentSubscription subscription = createPersistentSubscription(subscriptionName, cursor, PersistentSubscription.isCursorFromReplicatedSubscription(cursor), - cursor.getCursorProperties())); - // subscription-cursor gets activated by default: deactivate as there is no active subscription right - // now - subscriptions.get(subscriptionName).deactivateCursor(); + cursor.getCursorProperties()); + subscription.getPendingAckHandleInitFuture() + .thenAccept(unused -> { + subscriptions.put(subscriptionName, subscription); + // subscription-cursor gets activated by default: deactivate as there is no active subscription right + // now + subscriptions.get(subscriptionName).deactivateCursor(); + }) + .exceptionally(t -> { + log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + + "when initialize topic [{}].", subscriptionName, topic, t); + subscription.retryClose(); + return null; + }); } } this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); @@ -805,44 +815,62 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); - CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, - consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, - readCompacted, keySharedMeta, startMessageId, consumerEpoch); - - return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { - checkBackloggedCursors(); - if (!cnx.isActive()) { - try { - consumer.close(); - } catch (BrokerServiceException e) { - if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", - topic, subscriptionName, consumerId, consumerName); - } else if (e instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); - } + CompletableFuture future = subscriptionFuture + .thenCompose(subscription -> { + CompletableFuture f = new CompletableFuture<>(); + PersistentSubscription psub = (PersistentSubscription) subscription; + psub.getPendingAckHandleInitFuture() + .thenAccept(unused -> f.complete(subscription)) + .exceptionally(t -> { + log.warn("Subscription [{}] pendingAckHandle initialize failed. Ready to close.", + subscriptionName, t); + psub.deactivateCursor(); + subscriptions.remove(subscriptionName); + f.completeExceptionally(t); + // No need to close cursor. + psub.retryClose(); + return null; + }); + return f; + }) + .thenCompose(subscription -> { + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, + consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, + readCompacted, keySharedMeta, startMessageId, consumerEpoch); + + return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { + checkBackloggedCursors(); + if (!cnx.isActive()) { + try { + consumer.close(); + } catch (BrokerServiceException e) { + if (e instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", + topic, subscriptionName, consumerId, consumerName); + } else if (e instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + } - decrementUsageCount(); - return FutureUtil.failedFuture(e); - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), currentUsageCount()); - } + decrementUsageCount(); + return FutureUtil.failedFuture(e); + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, + consumer.consumerName(), currentUsageCount()); + } - decrementUsageCount(); - return FutureUtil.failedFuture( - new BrokerServiceException("Connection was closed while the opening the cursor ")); - } else { - checkReplicatedSubscriptionControllerState(); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); - } - return CompletableFuture.completedFuture(consumer); - } - }); - }); + decrementUsageCount(); + return FutureUtil.failedFuture( + new BrokerServiceException("Connection was closed while the opening the cursor ")); + } else { + checkReplicatedSubscriptionControllerState(); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + } + return CompletableFuture.completedFuture(consumer); + } + }); + }); future.exceptionally(ex -> { decrementUsageCount(); From f81902bf538ac75f124ea81179d7b0d16b3b92d9 Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 12 Dec 2022 18:55:43 +0800 Subject: [PATCH 02/12] fix checkstyle --- .../broker/service/persistent/PersistentTopic.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8708b27740c77..c5f11b4029834 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -288,13 +288,13 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS subscription.getPendingAckHandleInitFuture() .thenAccept(unused -> { subscriptions.put(subscriptionName, subscription); - // subscription-cursor gets activated by default: deactivate as there is no active subscription right - // now + // subscription-cursor gets activated by default: deactivate as there is no active + // subscription right now subscriptions.get(subscriptionName).deactivateCursor(); }) .exceptionally(t -> { - log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + - "when initialize topic [{}].", subscriptionName, topic, t); + log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + + "when initialize topic [{}].", subscriptionName, topic, t); subscription.retryClose(); return null; }); @@ -861,11 +861,13 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St decrementUsageCount(); return FutureUtil.failedFuture( - new BrokerServiceException("Connection was closed while the opening the cursor ")); + new BrokerServiceException( + "Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); if (log.isDebugEnabled()) { - log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + log.debug("[{}][{}] Created new subscription for {}", + topic, subscriptionName, consumerId); } return CompletableFuture.completedFuture(consumer); } From 938dfdffcb1cf9a2a997f2d296468e77d4a1e16f Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Dec 2022 19:34:26 +0800 Subject: [PATCH 03/12] complete tests --- .../service/persistent/PersistentTopic.java | 6 +- .../persistent/PersistentTopicTest.java | 149 +++++++++++++++++- 2 files changed, 149 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c5f11b4029834..1f856bf7db185 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -918,7 +918,8 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH); } - private CompletableFuture getDurableSubscription(String subscriptionName, + @VisibleForTesting + public CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); @@ -978,7 +979,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { return subscriptionFuture; } - private CompletableFuture getNonDurableSubscription(String subscriptionName, + @VisibleForTesting + public CompletableFuture getNonDurableSubscription(String subscriptionName, MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean isReadCompacted, Map subscriptionProperties) { log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index aa05624a5b0c9..810b0a21df1e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -35,8 +35,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -44,6 +46,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.*; @@ -53,6 +56,7 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -279,8 +283,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception { @DataProvider(name = "topicAndMetricsLevel") public Object[][] indexPatternTestData() { return new Object[][]{ - new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true}, - new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false}, + new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", true}, + new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", false}, }; } @@ -365,8 +369,8 @@ public void testUpdateCursorLastActive() throws Exception { Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); - PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName); - PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName); + PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName); + PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName); // `addConsumer` should update last active assertTrue(persistentSubscription.getCursor().getLastActive() > beforeAddConsumerTimestamp); @@ -402,4 +406,141 @@ public void testUpdateCursorLastActive() throws Exception { assertTrue(persistentSubscription.getCursor().getLastActive() > beforeRemoveConsumerTimestamp); assertTrue(persistentSubscription2.getCursor().getLastActive() > beforeRemoveConsumerTimestamp); } + + + @Test + public void testPendingAckHandleRelayFailedWhenCreateNewSub() throws Exception { + String topic = "persistent://prop/ns-123/aTopic"; + admin.namespaces().createNamespace(TopicName.get(topic).getNamespace()); + admin.topics().createNonPartitionedTopic(topic); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + Assert.assertNotNull(persistentTopic); + + PersistentTopic mpt = Mockito.spy(persistentTopic); + pulsar.getBrokerService().getTopics().put(topic, CompletableFuture.completedFuture(Optional.of(mpt))); + + PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException("aaaaaaaaaaa"))) + .when(subscription).getPendingAckHandleInitFuture(); + + Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), + Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); + Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getNonDurableSubscription(Mockito.anyString(), Mockito.any(), + Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); + + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub") + .subscribe()) { + Assert.fail(); + } catch (Exception t) { + // ignore + } + + try (Reader reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub1") + .startMessageId(MessageId.earliest) + .startMessageFromRollbackDuration(0, TimeUnit.SECONDS) + .create()) { + Assert.fail(); + } catch (Exception t) { + // ignore + } + + Assert.assertEquals(mpt.getSubscriptions().size(), 0); + + pulsar.getBrokerService().getTopics() + .put(topic, CompletableFuture.completedFuture(Optional.of(persistentTopic))); + + + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub") + .subscribe(); + Reader reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub1") + .startMessageId(MessageId.earliest) + .startMessageFromRollbackDuration(0, TimeUnit.SECONDS) + .create()) { + Assert.assertEquals(persistentTopic.getSubscriptions().size(), 2); + } catch (Exception t) { + Assert.fail(); + } + } + + @Test + public void testPendingAckHandleRelayFailedWhenReloadTopic() throws Exception { + String topic = "persistent://prop/ns-123/btopic_" + UUID.randomUUID(); + admin.namespaces().createNamespace(TopicName.get(topic).getNamespace()); + admin.topics().createNonPartitionedTopic(topic); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + Assert.assertNotNull(persistentTopic); + + try (Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create()) { + for (int a = 0; a < 100; a++) { + producer.send(UUID.randomUUID().toString()); + } + } + + int processed = 0; + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .isAckReceiptEnabled(true) + .subscriptionName("test_sub") + .subscribe()) { + for (int a = 0; a < 50; a++) { + Message message = consumer.receive(20, TimeUnit.SECONDS); + if (message != null) { + consumer.acknowledge(message); + processed++; + } else { + break; + } + } + } + + + CountDownLatch latch = new CountDownLatch(1); + + // Mock reload topic subscriptions failed. + Subscription subscription = persistentTopic.getSubscriptions().remove("test_sub"); + subscription.close() + .thenAccept(unused -> latch.countDown()); + + if (!latch.await(1, TimeUnit.MINUTES)) { + Assert.fail(); + } + + int processed1 = 0; + try (Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .isAckReceiptEnabled(true) + .subscriptionName("test_sub") + .subscribe()) { + while (true) { + Message message = consumer1.receive(10, TimeUnit.SECONDS); + if (null != message) { + consumer1.acknowledge(message); + processed1++; + } else { + break; + } + } + } + + Assert.assertEquals(processed + processed1, 100); + } } From 6b7fe4391f373d993d2e03daa9c5097dbcb3c94e Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 13 Dec 2022 19:40:18 +0800 Subject: [PATCH 04/12] fix imports --- .../service/persistent/PersistentTopicTest.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 810b0a21df1e3..b7037b9bf6431 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -49,7 +49,18 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; From 5e3b841b8b2e6642c893285f04a4afbd8103a4c0 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 14 Dec 2022 03:30:30 +0800 Subject: [PATCH 05/12] Cancel initialize subscriptions in PersistentTopic's constructor. --- .../service/persistent/PersistentTopic.java | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1f856bf7db185..67444fe826163 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -275,31 +275,6 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); - for (ManagedCursor cursor : ledger.getCursors()) { - if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME) - || cursor.getName().startsWith(replicatorPrefix)) { - // This is not a regular subscription, we are going to - // ignore it for now and let the message dedup logic to take care of it - } else { - final String subscriptionName = Codec.decode(cursor.getName()); - PersistentSubscription subscription = createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), - cursor.getCursorProperties()); - subscription.getPendingAckHandleInitFuture() - .thenAccept(unused -> { - subscriptions.put(subscriptionName, subscription); - // subscription-cursor gets activated by default: deactivate as there is no active - // subscription right now - subscriptions.get(subscriptionName).deactivateCursor(); - }) - .exceptionally(t -> { - log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " - + "when initialize topic [{}].", subscriptionName, topic, t); - subscription.retryClose(); - return null; - }); - } - } this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) { topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME))); From 4fd2e3d0c98c01b8ac401c0c3930145363700612 Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 14 Dec 2022 05:01:30 +0800 Subject: [PATCH 06/12] refactor --- .../persistent/PersistentSubscription.java | 14 ++++---- .../service/persistent/PersistentTopic.java | 34 ++++++++++++++++--- .../persistent/PersistentTopicTest.java | 2 +- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2951b2ab3a7f0..e5dd307bf104f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -127,7 +127,7 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; - private final CompletableFuture pendingAckHandleInitFuture; + private final CompletableFuture initializeFuture; private volatile Map subscriptionProperties; static { @@ -158,19 +158,19 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); - this.pendingAckHandleInitFuture = new CompletableFuture<>(); + this.initializeFuture = new CompletableFuture<>(); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !isEventSystemTopic(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); pendingAckHandle.pendingAckHandleFuture() - .thenAccept(__ -> pendingAckHandleInitFuture.complete(null)) + .thenAccept(__ -> initializeFuture.complete(null)) .exceptionally(t -> { - pendingAckHandleInitFuture.completeExceptionally(t); + initializeFuture.completeExceptionally(t); return null; }); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); - pendingAckHandleInitFuture.complete(null); + initializeFuture.complete(null); } IS_FENCED_UPDATER.set(this, FALSE); } @@ -1321,8 +1321,8 @@ public PendingAckHandle getPendingAckHandle() { return pendingAckHandle; } - public CompletableFuture getPendingAckHandleInitFuture() { - return this.pendingAckHandleInitFuture; + public CompletableFuture getInitializeFuture() { + return this.initializeFuture; } public void syncBatchPositionBitSetForPendingAck(PositionImpl position) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 67444fe826163..0f40e85aa1922 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -274,7 +274,31 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS registerTopicPolicyListener(); this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient()); - + for (ManagedCursor cursor : ledger.getCursors()) { + if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME) + || cursor.getName().startsWith(replicatorPrefix)) { + // This is not a regular subscription, we are going to + // ignore it for now and let the message dedup logic to take care of it + } else { + final String subscriptionName = Codec.decode(cursor.getName()); + PersistentSubscription subscription = createPersistentSubscription(subscriptionName, cursor, + PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + cursor.getCursorProperties()); + subscriptions.put(subscriptionName, subscription); + subscription.getInitializeFuture() + .exceptionally(t -> { + log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + + "when initialize topic [{}].", subscriptionName, topic, t); + if (subscriptions.remove(subscriptionName, subscription)) { + subscription.retryClose(); + } else { + log.warn("[{}] Remove subscription {} from subscriptions failed.", + topic, subscriptionName); + } + return null; + }); + } + } this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) { topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME))); @@ -793,17 +817,17 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture future = subscriptionFuture .thenCompose(subscription -> { CompletableFuture f = new CompletableFuture<>(); - PersistentSubscription psub = (PersistentSubscription) subscription; - psub.getPendingAckHandleInitFuture() + PersistentSubscription persistentSub = (PersistentSubscription) subscription; + persistentSub.getInitializeFuture() .thenAccept(unused -> f.complete(subscription)) .exceptionally(t -> { log.warn("Subscription [{}] pendingAckHandle initialize failed. Ready to close.", subscriptionName, t); - psub.deactivateCursor(); + persistentSub.deactivateCursor(); subscriptions.remove(subscriptionName); f.completeExceptionally(t); // No need to close cursor. - psub.retryClose(); + persistentSub.retryClose(); return null; }); return f; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index b7037b9bf6431..7b41230c56f00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -434,7 +434,7 @@ public void testPendingAckHandleRelayFailedWhenCreateNewSub() throws Exception { PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException("aaaaaaaaaaa"))) - .when(subscription).getPendingAckHandleInitFuture(); + .when(subscription).getInitializeFuture(); Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); From 5ceecea3e0f6457401a1cbdbb17c261e00c57ecb Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 30 Jan 2023 16:31:11 +0800 Subject: [PATCH 07/12] merge master --- .../pulsar/broker/service/persistent/PersistentTopicTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index fd62ac6f92c11..32a5854e32d25 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; From 348b6bd5ca1cd1c6854f001e722988dc589e1413 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 31 Jan 2023 12:30:41 +0800 Subject: [PATCH 08/12] fix check style --- .../pulsar/broker/service/persistent/PersistentTopic.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 23e01d5021e4b..cf07e649b0431 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -864,11 +864,13 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St decrementUsageCount(); return FutureUtil.failedFuture( - new BrokerServiceException("Connection was closed while the opening the cursor ")); + new BrokerServiceException( + "Connection was closed while the opening the cursor ")); } else { checkReplicatedSubscriptionControllerState(); if (log.isDebugEnabled()) { - log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + log.debug("[{}][{}] Created new subscription for {}", + topic, subscriptionName, consumerId); } return CompletableFuture.completedFuture(consumer); } From 5e7ac1e3acabce759975b5c87cc086393ac7596b Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 9 Feb 2023 15:52:34 +0800 Subject: [PATCH 09/12] fix issue --- .../persistent/PersistentSubscription.java | 13 --- .../service/persistent/PersistentTopic.java | 105 ++++++++---------- .../PendingAckHandleReplayException.java | 25 +++++ .../pendingack/impl/PendingAckHandleImpl.java | 4 +- .../persistent/PersistentTopicTest.java | 9 +- 5 files changed, 81 insertions(+), 75 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 02ff7f407ba44..49045f862a500 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -127,7 +127,6 @@ public class PersistentSubscription extends AbstractSubscription implements Subs private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; private final PendingAckHandle pendingAckHandle; - private final CompletableFuture initializeFuture; private volatile Map subscriptionProperties; static { @@ -158,19 +157,11 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); - this.initializeFuture = new CompletableFuture<>(); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() && !isEventSystemTopic(TopicName.get(topicName))) { this.pendingAckHandle = new PendingAckHandleImpl(this); - pendingAckHandle.pendingAckHandleFuture() - .thenAccept(__ -> initializeFuture.complete(null)) - .exceptionally(t -> { - initializeFuture.completeExceptionally(t); - return null; - }); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); - initializeFuture.complete(null); } IS_FENCED_UPDATER.set(this, FALSE); } @@ -1323,10 +1314,6 @@ public PendingAckHandle getPendingAckHandle() { return pendingAckHandle; } - public CompletableFuture getInitializeFuture() { - return this.initializeFuture; - } - public void syncBatchPositionBitSetForPendingAck(PositionImpl position) { this.pendingAckHandle.syncBatchPositionAckSetForTransaction(position); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18e7770e8f711..fa0d9c2401048 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -115,6 +115,7 @@ import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; @@ -295,7 +296,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS PersistentSubscription.isCursorFromReplicatedSubscription(cursor), cursor.getCursorProperties()); subscriptions.put(subscriptionName, subscription); - subscription.getInitializeFuture() + subscription.getPendingAckHandle() + .pendingAckHandleFuture() .exceptionally(t -> { log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + "when initialize topic [{}].", subscriptionName, topic, t); @@ -824,66 +826,46 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); - CompletableFuture future = subscriptionFuture - .thenCompose(subscription -> { - CompletableFuture f = new CompletableFuture<>(); - PersistentSubscription persistentSub = (PersistentSubscription) subscription; - persistentSub.getInitializeFuture() - .thenAccept(unused -> f.complete(subscription)) - .exceptionally(t -> { - log.warn("Subscription [{}] pendingAckHandle initialize failed. Ready to close.", - subscriptionName, t); - persistentSub.deactivateCursor(); - subscriptions.remove(subscriptionName); - f.completeExceptionally(t); - // No need to close cursor. - persistentSub.retryClose(); - return null; - }); - return f; - }) - .thenCompose(subscription -> { - Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, - consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, - readCompacted, keySharedMeta, startMessageId, consumerEpoch); - - return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { - if (subscription instanceof PersistentSubscription persistentSubscription) { - checkBackloggedCursor(persistentSubscription); + CompletableFuture future = subscriptionFuture.thenCompose(subscription -> { + Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, + consumerName, isDurable, cnx, cnx.getAuthRole(), metadata, + readCompacted, keySharedMeta, startMessageId, consumerEpoch); + + return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { + if (subscription instanceof PersistentSubscription persistentSubscription) { + checkBackloggedCursor(persistentSubscription); + } + if (!cnx.isActive()) { + try { + consumer.close(); + } catch (BrokerServiceException e) { + if (e instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", + topic, subscriptionName, consumerId, consumerName); + } else if (e instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); } - if (!cnx.isActive()) { - try { - consumer.close(); - } catch (BrokerServiceException e) { - if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", - topic, subscriptionName, consumerId, consumerName); - } else if (e instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); - } - decrementUsageCount(); - return FutureUtil.failedFuture(e); - } - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), currentUsageCount()); - } + decrementUsageCount(); + return FutureUtil.failedFuture(e); + } + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, + consumer.consumerName(), currentUsageCount()); + } - decrementUsageCount(); - return FutureUtil.failedFuture( - new BrokerServiceException( - "Connection was closed while the opening the cursor ")); - } else { - checkReplicatedSubscriptionControllerState(); - if (log.isDebugEnabled()) { - log.debug("[{}][{}] Created new subscription for {}", - topic, subscriptionName, consumerId); - } - return CompletableFuture.completedFuture(consumer); - } - }); - }); + decrementUsageCount(); + return FutureUtil.failedFuture( + new BrokerServiceException("Connection was closed while the opening the cursor ")); + } else { + checkReplicatedSubscriptionControllerState(); + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + } + return CompletableFuture.completedFuture(consumer); + } + }); + }); future.exceptionally(ex -> { decrementUsageCount(); @@ -906,7 +888,12 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); - } else { + } else if (ex instanceof PendingAckHandleReplayException) { + subscriptions.remove(subscriptionName); + log.warn("[{}] Failed to create subscription {} due to PendingAckHandle recover failed.", + topic, subscriptionName, ex); + } + else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java new file mode 100644 index 0000000000000..97545e3dda16e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.exceptions; + +public class PendingAckHandleReplayException extends Exception { + public PendingAckHandleReplayException(Throwable t) { + super(t); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index ed78feb453d1d..51babd98c4de4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; @@ -945,7 +946,8 @@ public void completeHandleFuture() { } public void exceptionHandleFuture(Throwable t) { - final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t); + final boolean completedNow = this.pendingAckHandleCompletableFuture + .completeExceptionally(new PendingAckHandleReplayException(t)); if (completedNow) { recoverTime.setRecoverEndTime(System.currentTimeMillis()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 32a5854e32d25..1f049810d81c9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -50,6 +50,8 @@ import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; +import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -435,9 +437,12 @@ public void testPendingAckHandleRelayFailedWhenCreateNewSub() throws Exception { pulsar.getBrokerService().getTopics().put(topic, CompletableFuture.completedFuture(Optional.of(mpt))); PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); - Mockito.doReturn(CompletableFuture.failedFuture(new RuntimeException("aaaaaaaaaaa"))) - .when(subscription).getInitializeFuture(); + PendingAckHandleImpl pendingAckHandle = Mockito.mock(PendingAckHandleImpl.class); + Mockito.doReturn(CompletableFuture + .failedFuture(new PendingAckHandleReplayException(new RuntimeException("This is an exception")))) + .when(pendingAckHandle.pendingAckHandleFuture()); + Mockito.doReturn(pendingAckHandle).when(subscription).getPendingAckHandle(); Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getNonDurableSubscription(Mockito.anyString(), Mockito.any(), From 0bb9cc2ab1389c32fc14e25a94c2eb655bdf9d0c Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 9 Feb 2023 19:10:57 +0800 Subject: [PATCH 10/12] fix issue --- .../pulsar/broker/service/persistent/PersistentTopic.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fa0d9c2401048..72ce24f1a9a93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -889,7 +889,10 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); } else if (ex instanceof PendingAckHandleReplayException) { - subscriptions.remove(subscriptionName); + PersistentSubscription subscription = subscriptions.remove(subscriptionName); + if (subscription != null) { + subscription.retryClose(); + } log.warn("[{}] Failed to create subscription {} due to PendingAckHandle recover failed.", topic, subscriptionName, ex); } From 8ac4924ee2d759a3f37cabc3b0cf5e4322ca880a Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 9 Feb 2023 19:37:34 +0800 Subject: [PATCH 11/12] fix checkstyle --- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d5f4be0f9b457..ec03fdea559bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -906,8 +906,7 @@ && isCompactionSubscription(subscriptionName)) { } log.warn("[{}] Failed to create subscription {} due to PendingAckHandle recover failed.", topic, subscriptionName, ex); - } - else { + } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } return null; From 10bbd1d6536729ce3820583f23e0aac06b6da941 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 9 Feb 2023 23:28:44 +0800 Subject: [PATCH 12/12] fix test & issue --- .../service/persistent/PersistentTopic.java | 2 +- .../persistent/PersistentTopicTest.java | 27 +++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ec03fdea559bc..37e8cee6046a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -899,7 +899,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); - } else if (ex instanceof PendingAckHandleReplayException) { + } else if (ex.getCause() instanceof PendingAckHandleReplayException) { PersistentSubscription subscription = subscriptions.remove(subscriptionName); if (subscription != null) { subscription.retryClose(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 1f049810d81c9..e9a65ee5e5fd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -424,7 +424,7 @@ public void testUpdateCursorLastActive() throws Exception { @Test - public void testPendingAckHandleRelayFailedWhenCreateNewSub() throws Exception { + public void testPendingAckHandleReplayFailedWhenCreateNewSub() throws Exception { String topic = "persistent://prop/ns-123/aTopic"; admin.namespaces().createNamespace(TopicName.get(topic).getNamespace()); admin.topics().createNonPartitionedTopic(topic); @@ -438,16 +438,33 @@ public void testPendingAckHandleRelayFailedWhenCreateNewSub() throws Exception { PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); PendingAckHandleImpl pendingAckHandle = Mockito.mock(PendingAckHandleImpl.class); + Mockito.doReturn(CompletableFuture .failedFuture(new PendingAckHandleReplayException(new RuntimeException("This is an exception")))) - .when(pendingAckHandle.pendingAckHandleFuture()); - + .when(pendingAckHandle).pendingAckHandleFuture(); + Mockito.doReturn(mpt).when(subscription).getTopic(); Mockito.doReturn(pendingAckHandle).when(subscription).getPendingAckHandle(); - Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), + + + Mockito.doAnswer(inv -> { + String subName = inv.getArgument(0); + mpt.getSubscriptions().put(subName, subscription); + return CompletableFuture.completedFuture(subscription); + }).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); - Mockito.doReturn(CompletableFuture.completedFuture(subscription)).when(mpt).getNonDurableSubscription(Mockito.anyString(), Mockito.any(), + + Mockito.doAnswer(inv -> { + String subName = inv.getArgument(0); + mpt.getSubscriptions().put(subName, subscription); + return CompletableFuture.completedFuture(subscription); + }).when(mpt).getNonDurableSubscription(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); + + Mockito.doReturn(CompletableFuture + .failedFuture(new PendingAckHandleReplayException(new RuntimeException("This is an exception")))) + .when(subscription).addConsumer(Mockito.any()); + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("test_sub")