From e3fe878ec904d3b60a8a50daba315f99603c61c7 Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Tue, 17 Jan 2023 11:40:46 +0800 Subject: [PATCH 1/2] [fix][broker] Close transactionBuffer after MessageDeduplication#checkStatus failed (#19157) (cherry picked from commit d25cf8ee9bdb5cabcb8a277524dff460e186d40a) --- .../pulsar/broker/service/BrokerService.java | 17 +++++- .../buffer/TopicTransactionBufferTest.java | 56 ++++++++++++++++++- 2 files changed, 71 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b437f9768376a..4151887be3fb9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1385,6 +1385,14 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean }); } + + @VisibleForTesting + public void createPersistentTopic0(final String topic, boolean createIfMissing, + CompletableFuture> topicFuture, + Map properties) { + createPersistentTopic(topic, createIfMissing, topicFuture, properties); + } + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { @@ -1459,6 +1467,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { .exceptionally((ex) -> { log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); topicFuture.completeExceptionally(ex); @@ -3187,7 +3201,8 @@ public long getPausedConnections() { } @SuppressWarnings("unchecked") - private T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService, + @VisibleForTesting + public T newTopic(String topic, ManagedLedger ledger, BrokerService brokerService, Class topicClazz) throws PulsarServerException { if (topicFactory != null) { try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 576ef647248d4..7ef0f98526499 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +18,16 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.TopicName; @@ -30,11 +37,17 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.powermock.reflect.Whitebox; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Collections; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class TopicTransactionBufferTest extends TransactionTestBase { @@ -86,4 +99,45 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed); txn.commit().get(); } + + @Test + public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + + Mockito + .doAnswer(inv -> { + String topic1 = inv.getArgument(0); + ManagedLedger ledger = inv.getArgument(1); + BrokerService service = inv.getArgument(2); + Class topicKlass = inv.getArgument(3); + if (topicKlass.equals(PersistentTopic.class)) { + PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); + CompletableFuture f =CompletableFuture + .failedFuture(new ManagedLedgerException("This is an exception")); + Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); + reference.set(pt); + return pt; + } else { + return new NonPersistentTopic(topic1, service); + } + }) + .when(brokerService) + .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), + Mockito.eq(PersistentTopic.class)); + + brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + + Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + } + } From fd85c8fedae7fa9de4ac8752f114831ebb843ed7 Mon Sep 17 00:00:00 2001 From: daojun Date: Fri, 20 Jan 2023 10:05:31 +0800 Subject: [PATCH 2/2] fix license header --- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 7ef0f98526499..ee2649fb5c445 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information