Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4272,8 +4272,7 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(),
SystemTopicNames.isSystemTopic(topicName)))
checkLocalOrGetPeerReplicationCluster(pulsar, topicName))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
Expand Down Expand Up @@ -4304,19 +4303,18 @@ public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTo
// and other vital information. Even after namespace starting deletion,,
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName))
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("Total number of partitions for topic {} is {}", topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});
checkLocalOrGetPeerReplicationCluster(pulsar, topicName).thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("Total number of partitions for topic {} is {}", topicName,
metadata.partitions);
}
metadataFuture.complete(metadata);
}).exceptionally(ex -> {
metadataFuture.completeExceptionally(ex.getCause());
return null;
});

return metadataFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,7 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
// and other vital information. Even after namespace starting deletion,
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName))
checkLocalOrGetPeerReplicationCluster(pulsarService, topicName)
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand Down Expand Up @@ -878,10 +879,20 @@ protected CompletableFuture<Void> validateGlobalNamespaceOwnershipAsync(Namespac
});
}

public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
TopicName topicName) {
if (isSystemTopic(topicName)) {
return CompletableFuture.completedFuture(null);
} else {
return checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject(), false);
}
}

public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
NamespaceName namespace) {
return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false);
}

public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService,
NamespaceName namespace,
boolean allowDeletedNamespace) {
Expand Down