From d7dcc17ef9b8072a7884597db02371febd2ec40a Mon Sep 17 00:00:00 2001 From: daojun Date: Wed, 1 Feb 2023 18:13:31 +0800 Subject: [PATCH 1/2] Close TransactionBuffer when create persistent topic timeout --- .../pulsar/broker/service/BrokerService.java | 7 ++++ .../buffer/TopicTransactionBufferTest.java | 40 ++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 618cc08956022..8aa8ea8f6c597 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1606,8 +1606,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (topicFuture.isCompletedExceptionally()) { + // Check create persistent topic timeout. log.warn("{} future is already completed with failure {}, closing the" + " topic", topic, FutureUtil.getException(topicFuture)); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers() .whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 5a9c928ca3c90..d774c9a2b033f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -42,8 +43,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -116,7 +120,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Class topicKlass = inv.getArgument(3); if (topicKlass.equals(PersistentTopic.class)) { PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); - CompletableFuture f =CompletableFuture + CompletableFuture f = CompletableFuture .failedFuture(new ManagedLedgerException("This is an exception")); Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); reference.set(pt); @@ -140,4 +144,38 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Assert.assertEquals(ttb.getState(), expectState); } + + @Test + public void testCloseTransactionBufferWhenTimeout() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1); + + Mockito + .doAnswer(inv -> { + Thread.sleep(topicLoadTimeout); + PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod(); + reference.set(persistentTopic); + return persistentTopic; + }) + .when(brokerService) + .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), + Mockito.eq(PersistentTopic.class)); + + CompletableFuture> f = brokerService.getTopic(topic, true); + + Awaitility.waitAtMost(90, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(5)).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + Assert.assertTrue(f.isCompletedExceptionally()); + } + } From 546b67581ddfebdbdd454c66115a2834ef9ef46f Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 2 Feb 2023 13:33:43 +0800 Subject: [PATCH 2/2] Close TransactionBuffer when create persistent topic timeout --- .../transaction/buffer/TopicTransactionBufferTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index d774c9a2b033f..aa98fc7d70106 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -152,6 +152,7 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception { BrokerService brokerService0 = pulsar.getBrokerService(); BrokerService brokerService = Mockito.spy(brokerService0); AtomicReference reference = new AtomicReference<>(); + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10); long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1); Mockito @@ -167,8 +168,8 @@ public void testCloseTransactionBufferWhenTimeout() throws Exception { CompletableFuture> f = brokerService.getTopic(topic, true); - Awaitility.waitAtMost(90, TimeUnit.SECONDS) - .pollInterval(Duration.ofSeconds(5)).until(() -> reference.get() != null); + Awaitility.waitAtMost(20, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Assert.assertTrue(buffer instanceof TopicTransactionBuffer);