Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,12 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean
});
}

@VisibleForTesting
public void createPersistentTopic0(final String topic, boolean createIfMissing,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't use BrokerService#getTopic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we can use the method, but when I wrote the test, I chose this method

CompletableFuture<Optional<Topic>> topicFuture) {
createPersistentTopic(topic, createIfMissing, topicFuture);
}

private void createPersistentTopic(final String topic, boolean createIfMissing,
CompletableFuture<Optional<Topic>> topicFuture) {
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
Expand Down Expand Up @@ -1367,7 +1373,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
try {
PersistentTopic persistentTopic = isSystemTopic(topic)
? new SystemTopic(topic, ledger, BrokerService.this)
: new PersistentTopic(topic, ledger, BrokerService.this);
: newPersistentTopic(topic, ledger, BrokerService.this);
CompletableFuture<Void> preCreateSubForCompaction =
persistentTopic.preCreateSubscriptionForCompactionIfNeeded();
CompletableFuture<Void> replicationFuture = persistentTopic
Expand Down Expand Up @@ -1400,6 +1406,12 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
"Replication or dedup check failed."
+ " Removing topic from topics list {}, {}",
topic, ex);
persistentTopic.getTransactionBuffer()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this similar logic to line 1395?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch!
it makes sense, in line 1395, it seems like checking create topic timeout, it's also need to close transaction buffer.
I'll create another PR to add the logic

.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
topicFuture.completeExceptionally(ex);
Expand Down Expand Up @@ -2947,6 +2959,11 @@ public long getPausedConnections() {
return pausedConnections.longValue();
}

@VisibleForTesting
public PersistentTopic newPersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService){
return new PersistentTopic(topic, ledger, brokerService);
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -30,11 +37,17 @@
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class TopicTransactionBufferTest extends TransactionTestBase {

Expand Down Expand Up @@ -86,4 +99,43 @@ public void testTransactionBufferAppendMarkerWriteFailState() throws Exception {
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", ManagedLedgerImpl.State.WriteFailed);
txn.commit().get();
}

@Test
public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
PulsarService pulsar = pulsarServiceList.get(0);
BrokerService brokerService0 = pulsar.getBrokerService();
BrokerService brokerService = Mockito.spy(brokerService0);
AtomicReference<PersistentTopic> reference = new AtomicReference<>();

Mockito
.doAnswer(inv -> {
String topic1 = inv.getArgument(0);
ManagedLedger ledger = inv.getArgument(1);
BrokerService service = inv.getArgument(2);
if (TopicName.get(topic1).isPersistent()) {
PersistentTopic pt = Mockito.spy(new PersistentTopic(topic1, ledger, service));
CompletableFuture<Void> f = new CompletableFuture<>();
f.completeExceptionally(new ManagedLedgerException("This is an exception"));
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
reference.set(pt);
return pt;
} else {
return new NonPersistentTopic(topic1, service);
}
})
.when(brokerService)
.newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));

brokerService.createPersistentTopic0(topic, true, new CompletableFuture<>());

Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
}

}