Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,6 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
private volatile LedgerHandle currentLedger;
protected volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
protected long currentLedgerSize = 0;
private long lastLedgerCreatedTimestamp = 0;
private long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,15 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
- topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if (topicFuture.isCompletedExceptionally()) {
// Check create persistent topic timeout.
log.warn("{} future is already completed with failure {}, closing the"
+ " topic", topic, FutureUtil.getException(topicFuture));
persistentTopic.getTransactionBuffer()
.closeAsync()
.exceptionally(t -> {
log.error("[{}] Close transactionBuffer failed", topic, t);
return null;
});
persistentTopic.stopReplProducers()
.whenCompleteAsync((v, exception) -> {
topics.remove(topic, topicFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand Down Expand Up @@ -1264,7 +1263,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer)
&& !isEncrypted
&& !SystemTopicNames.isSystemTopic(topicName)) {
&& !topic.getBrokerService().isSystemTopic(topicName)) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import lombok.Cleanup;
Expand Down Expand Up @@ -136,7 +135,8 @@ public void testIncrementPartitionsOfTopicWithSubscriptionProperties() throws Ex
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);

assertEquals(admin.topics().getSubscriptions(
TopicName.get(partitionedTopicName).getPartition(15).toString()), List.of("sub-1"));
TopicName.get(partitionedTopicName).getPartition(15).toString()),
Lists.newArrayList("sub-1"));
TopicStats stats = admin.topics()
.getStats(TopicName.get(partitionedTopicName).getPartition(15).toString());
Map<String, String> subscriptionProperties = stats.getSubscriptions()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
Expand All @@ -42,8 +43,11 @@
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -138,4 +142,38 @@ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws Excep
Assert.assertEquals(ttb.getState(), expectState);
}


@Test
public void testCloseTransactionBufferWhenTimeout() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/test_" + UUID.randomUUID();
PulsarService pulsar = pulsarServiceList.get(0);
BrokerService brokerService0 = pulsar.getBrokerService();
BrokerService brokerService = Mockito.spy(brokerService0);
AtomicReference<PersistentTopic> reference = new AtomicReference<>();
pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
long topicLoadTimeout = TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds() + 1);

Mockito
.doAnswer(inv -> {
Thread.sleep(topicLoadTimeout);
PersistentTopic persistentTopic = (PersistentTopic) inv.callRealMethod();
reference.set(persistentTopic);
return persistentTopic;
})
.when(brokerService)
.newPersistentTopic(Mockito.eq(topic), Mockito.any(), Mockito.eq(brokerService));

CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, true);

Awaitility.waitAtMost(20, TimeUnit.SECONDS)
.pollInterval(Duration.ofSeconds(2)).until(() -> reference.get() != null);
PersistentTopic persistentTopic = reference.get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
TopicTransactionBufferState.State expectState = TopicTransactionBufferState.State.Close;
Assert.assertEquals(ttb.getState(), expectState);
Assert.assertTrue(f.isCompletedExceptionally());
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down