diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 682f41dcdb61f..9b071b7eee3d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4272,8 +4272,7 @@ public CompletableFuture getPartitionedTopicMetadata( // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. authorizationFuture.thenCompose(__ -> - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), - SystemTopicNames.isSystemTopic(topicName))) + checkLocalOrGetPeerReplicationCluster(pulsar, topicName)) .thenCompose(res -> pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) .thenAccept(metadata -> { @@ -4304,19 +4303,18 @@ public static CompletableFuture unsafeGetPartitionedTo // and other vital information. Even after namespace starting deletion,, // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName)) - .thenCompose(res -> pulsar.getBrokerService() - .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) - .thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("Total number of partitions for topic {} is {}", topicName, - metadata.partitions); - } - metadataFuture.complete(metadata); - }).exceptionally(ex -> { - metadataFuture.completeExceptionally(ex.getCause()); - return null; - }); + checkLocalOrGetPeerReplicationCluster(pulsar, topicName).thenCompose(res -> pulsar.getBrokerService() + .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) + .thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("Total number of partitions for topic {} is {}", topicName, + metadata.partitions); + } + metadataFuture.complete(metadata); + }).exceptionally(ex -> { + metadataFuture.completeExceptionally(ex.getCause()); + return null; + }); return metadataFuture; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 7b2c777414884..a3e6a724127c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -228,8 +228,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe // and other vital information. Even after namespace starting deletion, // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. - checkLocalOrGetPeerReplicationCluster(pulsarService, - topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName)) + checkLocalOrGetPeerReplicationCluster(pulsarService, topicName) .thenAccept(peerClusterData -> { if (peerClusterData == null) { // (4) all validation passed: initiate lookup diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 99f0a30d1a5f2..3f508385f3329 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -878,10 +879,20 @@ protected CompletableFuture validateGlobalNamespaceOwnershipAsync(Namespac }); } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + TopicName topicName) { + if (isSystemTopic(topicName)) { + return CompletableFuture.completedFuture(null); + } else { + return checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject(), false); + } + } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace, boolean allowDeletedNamespace) {