diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 10ae4fa118fe0..58c5987dfa0b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1325,6 +1325,12 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean }); } + @VisibleForTesting + public void createPersistentTopic0(final String topic, boolean createIfMissing, + CompletableFuture> topicFuture) { + createPersistentTopic(topic, createIfMissing, topicFuture); + } + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -1367,7 +1373,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) - : new PersistentTopic(topic, ledger, BrokerService.this); + : newPersistentTopic(topic, ledger, BrokerService.this); CompletableFuture preCreateSubForCompaction = persistentTopic.preCreateSubscriptionForCompactionIfNeeded(); CompletableFuture replicationFuture = persistentTopic @@ -1400,6 +1406,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { "Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); topicFuture.completeExceptionally(ex); @@ -2947,6 +2959,11 @@ public long getPausedConnections() { return pausedConnections.longValue(); } + @VisibleForTesting + public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService){ + return new PersistentTopic(topic, ledger, brokerService); + } + @VisibleForTesting public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { this.pulsarChannelInitFactory = factory; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 576ef647248d4..7f1a5a8042afd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -18,9 +18,16 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.TopicName; @@ -30,11 +37,17 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.powermock.reflect.Whitebox; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Collections; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class TopicTransactionBufferTest extends TransactionTestBase { @@ -86,4 +99,43 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed); txn.commit().get(); } + + @Test + public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + + Mockito + .doAnswer(inv -> { + String topic1 = inv.getArgument(0); + ManagedLedger ledger = inv.getArgument(1); + BrokerService service = inv.getArgument(2); + if (TopicName.get(topic1).isPersistent()) { + PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new ManagedLedgerException("This is an exception")); + Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); + reference.set(pt); + return pt; + } else { + return new NonPersistentTopic(topic1, service); + } + }) + .when(brokerService) + .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService)); + + brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>()); + + Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + } + }