diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 618cc08956022..8aa8ea8f6c597 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1606,8 +1606,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (topicFuture.isCompletedExceptionally()) { + // Check create persistent topic timeout. log.warn("{} future is already completed with failure {}, closing the" + " topic", topic, FutureUtil.getException(topicFuture)); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers() .whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 5a9c928ca3c90..aa98fc7d70106 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -42,8 +43,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -116,7 +120,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Class topicKlass = inv.getArgument(3); if (topicKlass.equals(PersistentTopic.class)) { PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); - CompletableFuture f =CompletableFuture + CompletableFuture f = CompletableFuture .failedFuture(new ManagedLedgerException("This is an exception")); Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); reference.set(pt); @@ -140,4 +144,39 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Assert.assertEquals(ttb.getState(), expectState); } + + @Test + public void testCloseTransactionBufferWhenTimeout() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10); + long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1); + + Mockito + .doAnswer(inv -> { + Thread.sleep(topicLoadTimeout); + PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod(); + reference.set(persistentTopic); + return persistentTopic; + }) + .when(brokerService) + .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), + Mockito.eq(PersistentTopic.class)); + + CompletableFuture> f = brokerService.getTopic(topic, true); + + Awaitility.waitAtMost(20, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + Assert.assertTrue(f.isCompletedExceptionally()); + } + }