From 4b98829939c8083ed5e1090d7996288931fba059 Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Mon, 6 Feb 2023 15:32:59 +0800 Subject: [PATCH 1/3] [fix] Close TransactionBuffer when create persistent topic timeout (#19384) (cherry picked from commit 8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f) --- .../mledger/impl/ManagedCursorImpl.java | 6 --- .../mledger/impl/ManagedLedgerImpl.java | 4 +- .../pulsar/broker/service/BrokerService.java | 7 ++++ .../pulsar/broker/service/ServerCnx.java | 3 +- .../broker/admin/IncrementPartitionsTest.java | 3 +- .../broker/transaction/TransactionTest.java | 1 + .../buffer/TopicTransactionBufferTest.java | 38 +++++++++++++++++++ 7 files changed, 51 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 09322382a468c..71948f547c217 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -204,12 +204,6 @@ public class ManagedCursorImpl implements ManagedCursor { // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. private volatile boolean isActive = false; - // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. - private volatile boolean isActive = false; - - // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. - private volatile boolean isActive = false; - class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a05b6e451ac22..162240b44f09c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -215,9 +215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); - private volatile LedgerHandle currentLedger; + protected volatile LedgerHandle currentLedger; private long currentLedgerEntries = 0; - private long currentLedgerSize = 0; + protected long currentLedgerSize = 0; private long lastLedgerCreatedTimestamp = 0; private long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 61eff24a78fc7..0ea95995f4b32 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1457,8 +1457,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); if (topicFuture.isCompletedExceptionally()) { + // Check create persistent topic timeout. log.warn("{} future is already completed with failure {}, closing the" + " topic", topic, FutureUtil.getException(topicFuture)); + persistentTopic.getTransactionBuffer() + .closeAsync() + .exceptionally(t -> { + log.error("[{}] Close transactionBuffer failed", topic, t); + return null; + }); persistentTopic.stopReplProducers() .whenCompleteAsync((v, exception) -> { topics.remove(topic, topicFuture); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1f08db86aafcc..6cc89dfe03019 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -134,7 +134,6 @@ import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.naming.Metadata; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; @@ -1261,7 +1260,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { // Check whether the producer will publish encrypted messages or not if ((topic.isEncryptionRequired() || encryptionRequireOnProducer) && !isEncrypted - && !SystemTopicNames.isSystemTopic(topicName)) { + && !topic.getBrokerService().isSystemTopic(topicName)) { String msg = String.format("Encryption is required in %s", topicName); log.warn("[{}] {}", remoteAddress, msg); if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index e156678a394c6..6d1444fa3a44e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -136,7 +136,8 @@ public void testIncrementPartitionsOfTopicWithSubscriptionProperties() throws Ex assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); assertEquals(admin.topics().getSubscriptions( - TopicName.get(partitionedTopicName).getPartition(15).toString()), List.of("sub-1")); + TopicName.get(partitionedTopicName).getPartition(15).toString()), + Lists.newArrayList("sub-1")); TopicStats stats = admin.topics() .getStats(TopicName.get(partitionedTopicName).getPartition(15).toString()); Map subscriptionProperties = stats.getSubscriptions() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index a2c6e7dbafcc8..2da7502dc2575 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -39,6 +39,7 @@ import io.netty.util.Timeout; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java index 1cf190056afb5..ddb9af94f5605 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.TransactionTestBase; @@ -42,8 +43,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; + +import java.time.Duration; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -138,4 +142,38 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep Assert.assertEquals(ttb.getState(), expectState); } + + @Test + public void testCloseTransactionBufferWhenTimeout() throws Exception { + String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID(); + PulsarService pulsar = pulsarServiceList.get(0); + BrokerService brokerService0 = pulsar.getBrokerService(); + BrokerService brokerService = Mockito.spy(brokerService0); + AtomicReference reference = new AtomicReference<>(); + pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10); + long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1); + + Mockito + .doAnswer(inv -> { + Thread.sleep(topicLoadTimeout); + PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod(); + reference.set(persistentTopic); + return persistentTopic; + }) + .when(brokerService) + .newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService)); + + CompletableFuture> f = brokerService.getTopic(topic, true); + + Awaitility.waitAtMost(20, TimeUnit.SECONDS) + .pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null); + PersistentTopic persistentTopic = reference.get(); + TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); + Assert.assertTrue(buffer instanceof TopicTransactionBuffer); + TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer; + TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close; + Assert.assertEquals(ttb.getState(), expectState); + Assert.assertTrue(f.isCompletedExceptionally()); + } + } From 0009a78191b9a1ab57389d4f7ac063370dd222c2 Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Tue, 7 Feb 2023 23:35:04 +0800 Subject: [PATCH 2/3] checkstyle --- .../org/apache/pulsar/broker/admin/IncrementPartitionsTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index 6d1444fa3a44e..cae1fa1e2e221 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.Sets; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import lombok.Cleanup; From 91a82019841134e7573566fb4c33b83f6ee6e3ac Mon Sep 17 00:00:00 2001 From: xiangying <1984997880@qq.com> Date: Wed, 8 Feb 2023 00:45:57 +0800 Subject: [PATCH 3/3] checkstyle --- .../mledger/impl/MangedLedgerInterceptorImplTest2.java | 2 +- .../service/persistent/PartitionKeywordCompatibilityTest.java | 2 +- .../apache/pulsar/compaction/GetLastMessageIdCompactedTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java index 080ef9ea4c5fe..d3e11311de739 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java index 58b9dcee628b3..f815fef1c8772 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PartitionKeywordCompatibilityTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java index 0be9fa407542f..b1f5de71e5ec7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information