diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2012aa06b3006..49045f862a500 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -31,6 +31,8 @@ import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -890,6 +892,23 @@ public CompletableFuture close() { } } + protected void retryClose() { + ScheduledExecutorService scheduler = topic.getBrokerService().pulsar().getExecutor(); + scheduler.schedule(() -> { + close() + .thenAccept(unused -> { + if (log.isDebugEnabled()) { + log.debug("Close subscription {} finished.", subName); + } + }) + .exceptionally(t -> { + log.warn("Close subscription {} failed.", subName, t); + retryClose(); + return null; + }); + }, 20, TimeUnit.SECONDS); + } + /** * Disconnect all consumers attached to the dispatcher and close this subscription. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2b951cd9e3b12..37e8cee6046a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -116,6 +116,7 @@ import org.apache.pulsar.broker.stats.ReplicationMetrics; import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.OffloadProcessStatus; @@ -298,12 +299,23 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS // ignore it for now and let the message dedup logic to take care of it } else { final String subscriptionName = Codec.decode(cursor.getName()); - subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, + PersistentSubscription subscription = createPersistentSubscription(subscriptionName, cursor, PersistentSubscription.isCursorFromReplicatedSubscription(cursor), - cursor.getCursorProperties())); - // subscription-cursor gets activated by default: deactivate as there is no active subscription right - // now - subscriptions.get(subscriptionName).deactivateCursor(); + cursor.getCursorProperties()); + subscriptions.put(subscriptionName, subscription); + subscription.getPendingAckHandle() + .pendingAckHandleFuture() + .exceptionally(t -> { + log.warn("PersistentSubscription [{}] pendingAckHandleImpl relay failed " + + "when initialize topic [{}].", subscriptionName, topic, t); + if (subscriptions.remove(subscriptionName, subscription)) { + subscription.retryClose(); + } else { + log.warn("[{}] Remove subscription {} from subscriptions failed.", + topic, subscriptionName); + } + return null; + }); } } this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger); @@ -887,6 +899,13 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException && isCompactionSubscription(subscriptionName)) { log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage()); + } else if (ex.getCause() instanceof PendingAckHandleReplayException) { + PersistentSubscription subscription = subscriptions.remove(subscriptionName); + if (subscription != null) { + subscription.retryClose(); + } + log.warn("[{}] Failed to create subscription {} due to PendingAckHandle recover failed.", + topic, subscriptionName, ex); } else { log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex); } @@ -910,7 +929,8 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs replicatedSubscriptionStateArg, keySharedMeta, null, DEFAULT_CONSUMER_EPOCH); } - private CompletableFuture getDurableSubscription(String subscriptionName, + @VisibleForTesting + public CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); @@ -970,7 +990,8 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { return subscriptionFuture; } - private CompletableFuture getNonDurableSubscription(String subscriptionName, + @VisibleForTesting + public CompletableFuture getNonDurableSubscription(String subscriptionName, MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean isReadCompacted, Map subscriptionProperties) { log.info("[{}][{}] Creating non-durable subscription at msg id {} - {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java new file mode 100644 index 0000000000000..97545e3dda16e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/exceptions/PendingAckHandleReplayException.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction.pendingack.exceptions; + +public class PendingAckHandleReplayException extends Exception { + public PendingAckHandleReplayException(Throwable t) { + super(t); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index ed78feb453d1d..51babd98c4de4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -53,6 +53,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandleStats; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; @@ -945,7 +946,8 @@ public void completeHandleFuture() { } public void exceptionHandleFuture(Throwable t) { - final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t); + final boolean completedNow = this.pendingAckHandleCompletableFuture + .completeExceptionally(new PendingAckHandleReplayException(t)); if (completedNow) { recoverTime.setRecoverEndTime(System.currentTimeMillis()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 6f9c260c8ffaf..e9a65ee5e5fd1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -36,8 +36,10 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -45,16 +47,31 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.broker.transaction.pendingack.exceptions.PendingAckHandleReplayException; +import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; import org.junit.Assert; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -281,8 +298,8 @@ public void testPersistentPartitionedTopicUnload() throws Exception { @DataProvider(name = "topicAndMetricsLevel") public Object[][] indexPatternTestData() { return new Object[][]{ - new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true}, - new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false}, + new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", true}, + new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", false}, }; } @@ -367,8 +384,8 @@ public void testUpdateCursorLastActive() throws Exception { Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); - PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName); - PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName); + PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName); + PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName); // `addConsumer` should update last active assertTrue(persistentSubscription.getCursor().getLastActive() > beforeAddConsumerTimestamp); @@ -406,6 +423,163 @@ public void testUpdateCursorLastActive() throws Exception { } + @Test + public void testPendingAckHandleReplayFailedWhenCreateNewSub() throws Exception { + String topic = "persistent://prop/ns-123/aTopic"; + admin.namespaces().createNamespace(TopicName.get(topic).getNamespace()); + admin.topics().createNonPartitionedTopic(topic); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + Assert.assertNotNull(persistentTopic); + + PersistentTopic mpt = Mockito.spy(persistentTopic); + pulsar.getBrokerService().getTopics().put(topic, CompletableFuture.completedFuture(Optional.of(mpt))); + + PersistentSubscription subscription = Mockito.mock(PersistentSubscription.class); + PendingAckHandleImpl pendingAckHandle = Mockito.mock(PendingAckHandleImpl.class); + + Mockito.doReturn(CompletableFuture + .failedFuture(new PendingAckHandleReplayException(new RuntimeException("This is an exception")))) + .when(pendingAckHandle).pendingAckHandleFuture(); + Mockito.doReturn(mpt).when(subscription).getTopic(); + Mockito.doReturn(pendingAckHandle).when(subscription).getPendingAckHandle(); + + + Mockito.doAnswer(inv -> { + String subName = inv.getArgument(0); + mpt.getSubscriptions().put(subName, subscription); + return CompletableFuture.completedFuture(subscription); + }).when(mpt).getDurableSubscription(Mockito.any(), Mockito.any(), + Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); + + Mockito.doAnswer(inv -> { + String subName = inv.getArgument(0); + mpt.getSubscriptions().put(subName, subscription); + return CompletableFuture.completedFuture(subscription); + }).when(mpt).getNonDurableSubscription(Mockito.anyString(), Mockito.any(), + Mockito.any(), Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyMap()); + + + Mockito.doReturn(CompletableFuture + .failedFuture(new PendingAckHandleReplayException(new RuntimeException("This is an exception")))) + .when(subscription).addConsumer(Mockito.any()); + + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub") + .subscribe()) { + Assert.fail(); + } catch (Exception t) { + // ignore + } + + try (Reader reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub1") + .startMessageId(MessageId.earliest) + .startMessageFromRollbackDuration(0, TimeUnit.SECONDS) + .create()) { + Assert.fail(); + } catch (Exception t) { + // ignore + } + + Assert.assertEquals(mpt.getSubscriptions().size(), 0); + + pulsar.getBrokerService().getTopics() + .put(topic, CompletableFuture.completedFuture(Optional.of(persistentTopic))); + + + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub") + .subscribe(); + Reader reader = pulsarClient.newReader(Schema.STRING) + .topic(topic) + .subscriptionName("test_sub1") + .startMessageId(MessageId.earliest) + .startMessageFromRollbackDuration(0, TimeUnit.SECONDS) + .create()) { + Assert.assertEquals(persistentTopic.getSubscriptions().size(), 2); + } catch (Exception t) { + Assert.fail(); + } + } + + @Test + public void testPendingAckHandleRelayFailedWhenReloadTopic() throws Exception { + String topic = "persistent://prop/ns-123/btopic_" + UUID.randomUUID(); + admin.namespaces().createNamespace(TopicName.get(topic).getNamespace()); + admin.topics().createNonPartitionedTopic(topic); + + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topic, false).get().get(); + Assert.assertNotNull(persistentTopic); + + try (Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create()) { + for (int a = 0; a < 100; a++) { + producer.send(UUID.randomUUID().toString()); + } + } + + int processed = 0; + try (Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .isAckReceiptEnabled(true) + .subscriptionName("test_sub") + .subscribe()) { + for (int a = 0; a < 50; a++) { + Message message = consumer.receive(20, TimeUnit.SECONDS); + if (message != null) { + consumer.acknowledge(message); + processed++; + } else { + break; + } + } + } + + + CountDownLatch latch = new CountDownLatch(1); + + // Mock reload topic subscriptions failed. + Subscription subscription = persistentTopic.getSubscriptions().remove("test_sub"); + subscription.close() + .thenAccept(unused -> latch.countDown()); + + if (!latch.await(1, TimeUnit.MINUTES)) { + Assert.fail(); + } + + int processed1 = 0; + try (Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .isAckReceiptEnabled(true) + .subscriptionName("test_sub") + .subscribe()) { + while (true) { + Message message = consumer1.receive(10, TimeUnit.SECONDS); + if (null != message) { + consumer1.acknowledge(message); + processed1++; + } else { + break; + } + } + } + + Assert.assertEquals(processed + processed1, 100); + } + + @Test public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException { final String topicName = "persistent://prop/ns-abc/testCreateNonExistentPartitions";