Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,13 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
});
}

@VisibleForTesting
public void createPersistentTopic0(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties) {
createPersistentTopic(topic, createIfMissing, topicFuture, properties);
}

private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture,
Map<String, String> properties) {
Expand Down Expand Up @@ -1434,7 +1441,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
: newPersistentTopic(topic, ledger, BrokerService.this);
persistentTopic
.initialize()
.thenCompose(__ -> persistentTopic.preCreateSubscriptionForCompactionIfNeeded())
Expand Down Expand Up @@ -1464,6 +1471,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
.exceptionally((ex) -> {
log.warn("Replication or dedup check failed."
+ " Removing topic from topics list {}, {}", topic, ex);
persistentTopic.getTransactionBuffer()
.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
Expand Down Expand Up @@ -3085,6 +3098,11 @@ public long getPausedConnections() {
return pausedConnections.longValue();
}

@VisibleForTesting
public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) {
return new PersistentTopic(topic, ledger, brokerService);
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -30,11 +37,17 @@
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TopicTransactionBufferTest extends TransactionTestBase {

Expand Down Expand Up @@ -86,4 +99,43 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception {
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
txn.commit().get();
}

@Test
public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
PulsarService pulsar = pulsarServiceList.get(0);
BrokerService brokerService0 = pulsar.getBrokerService();
BrokerService brokerService = Mockito.spy(brokerService0);
AtomicReference<PersistentTopic> reference = new AtomicReference<>();

Mockito
.doAnswer(inv -> {
String topic1 = inv.getArgument(0);
ManagedLedger ledger = inv.getArgument(1);
BrokerService service = inv.getArgument(2);
if (TopicName.get(topic1).isPersistent()) {
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(new ManagedLedgerException("This is an exception"));
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
reference.set(pt);
return pt;
} else {
return new NonPersistentTopic(topic1, service);
}
})
.when(brokerService)
.newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));

brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>(), Collections.emptyMap());

Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
}

}