From 6de51f55712056fcbab324ecdf7739298ff0d2a3 Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Tue, 17 Jan 2023 11:40:46 +0800 Subject: [PATCH 1/6] [fix][broker] Close transactionBuffer after MessageDeduplication#checkStatus failed (#19157) (cherry picked from commit d25cf8ee9bdb5cabcb8a277524dff460e186d40a) --- .../buffer/TopicTransactionBufferTest.java | 56 ++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 576ef647248d4..45e0c71be8f0f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +18,16 @@ */ package org.apache.pulsar.broker.transaction.buffer; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.naming.TopicName; @@ -30,11 +37,17 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.powermock.reflect.Whitebox; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Collections; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class TopicTransactionBufferTest extends TransactionTestBase { @@ -86,4 +99,45 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception { Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed); txn.commit().get(); } + + @Test + public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + + Mockito + .doAnswer(inv -> { + String topic1 = inv.getArgument(0); + ManagedLedger ledger = inv.getArgument(1); + BrokerService service = inv.getArgument(2); + Class topicKlass = inv.getArgument(3); + if (topicKlass.equals(PersistentTopic.class)) { + PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new ManagedLedgerException("This is an exception")); + Mockito.doReturn(f).when(pt).checkDeduplicationStatus(); + reference.set(pt); + return pt; + } else { + return new NonPersistentTopic(topic1, service); + } + }) + .when(brokerService) + .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), + Mockito.eq(PersistentTopic.class)); + + brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); + + Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + } + } From e79b8f4efd33d43667dfb85c1797340c89480796 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 19 Jan 2023 20:12:09 +0800 Subject: [PATCH 2/6] Close transactionBuffer when MessageDeduplication#checkStatus failed --- .../pulsar/broker/service/BrokerService.java | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index dda76e837673e..61eff24a78fc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1390,6 +1390,13 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean }); } + @VisibleForTesting + public void createPersistentTopic0(final String topic, boolean createIfMissing, + CompletableFuture> topicFuture, + Map properties) { + createPersistentTopic(topic, createIfMissing, topicFuture, properties); + } + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { @@ -1434,7 +1441,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { try { PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) - : new PersistentTopic(topic, ledger, BrokerService.this); + : newPersistentTopic(topic, ledger, BrokerService.this); persistentTopic .initialize() .thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded()) @@ -1464,6 +1471,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { .exceptionally((ex) -> { log.warn("Replication or dedup check failed." + " Removing topic from topics list {}, {}", topic, ex); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); topicFuture.completeExceptionally(ex); @@ -3085,6 +3098,11 @@ public long getPausedConnections() { return pausedConnections.longValue(); } + @VisibleForTesting + public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) { + return new PersistentTopic(topic, ledger, brokerService); + } + @VisibleForTesting public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { this.pulsarChannelInitFactory = factory; From dec3dd65ecd7bf613fc2068c6761cb7f60287185 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 19 Jan 2023 20:12:30 +0800 Subject: [PATCH 3/6] Close transactionBuffer when MessageDeduplication#checkStatus failed --- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 45e0c71be8f0f..0da3eb6442730 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -126,8 +126,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep } }) .when(brokerService) - .newTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService), - Mockito.eq(PersistentTopic.class)); + .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService)); brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); From 7705176bf6e51e0e3a79037b81929b754f742533 Mon Sep 17 00:00:00 2001 From: daojun Date: Thu, 19 Jan 2023 20:13:34 +0800 Subject: [PATCH 4/6] Close transactionBuffer when MessageDeduplication#checkStatus failed --- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 0da3eb6442730..72a6ff7226243 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From d494be43fa6f81e53960a35b9d59f4e04276f934 Mon Sep 17 00:00:00 2001 From: daojun Date: Mon, 30 Jan 2023 16:02:31 +0800 Subject: [PATCH 5/6] fix test --- .../broker/transaction/buffer/TopicTransactionBufferTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 72a6ff7226243..f129d16ff98f3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -130,7 +130,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); - Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); + Awaitility.waitAtMost(3, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Assert.assertTrue(buffer instanceof TopicTransactionBuffer); From c7766e183e1c1148a79631eb99ba06316eb908b1 Mon Sep 17 00:00:00 2001 From: daojun Date: Tue, 31 Jan 2023 12:50:30 +0800 Subject: [PATCH 6/6] fix test --- .../transaction/buffer/TopicTransactionBufferTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index f129d16ff98f3..1cf190056afb5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -113,8 +113,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep String topic1 = inv.getArgument(0); ManagedLedger ledger = inv.getArgument(1); BrokerService service = inv.getArgument(2); - Class topicKlass = inv.getArgument(3); - if (topicKlass.equals(PersistentTopic.class)) { + if (TopicName.get(topic1).isPersistent()) { PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service)); CompletableFuture f = new CompletableFuture<>(); f.completeExceptionally(new ManagedLedgerException("This is an exception")); @@ -130,7 +129,7 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap()); - Awaitility.waitAtMost(3, TimeUnit.MINUTES).until(() -> reference.get() != null); + Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null); PersistentTopic persistentTopic = reference.get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Assert.assertTrue(buffer instanceof TopicTransactionBuffer);