From 6c920694fba081f6802eb2465987aaef703806fa Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Fri, 26 Apr 2024 01:01:45 +0300 Subject: [PATCH 1/2] [fix][broker] Don't check replication clusters for ownership of system topics --- .../admin/impl/PersistentTopicsBase.java | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 682f41dcdb61f..47415333e647e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4304,19 +4304,27 @@ public static CompletableFuture unsafeGetPartitionedTo // and other vital information. Even after namespace starting deletion,, // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName)) - .thenCompose(res -> pulsar.getBrokerService() - .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) - .thenAccept(metadata -> { - if (log.isDebugEnabled()) { - log.debug("Total number of partitions for topic {} is {}", topicName, - metadata.partitions); - } - metadataFuture.complete(metadata); - }).exceptionally(ex -> { - metadataFuture.completeExceptionally(ex.getCause()); - return null; - }); + CompletableFuture clusterOwnershipCheck; + if (isSystemTopic(topicName)) { + clusterOwnershipCheck = CompletableFuture.completedFuture(null); + } else { + clusterOwnershipCheck = + checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) + .thenApply(res -> null); + } + + clusterOwnershipCheck.thenCompose(res -> pulsar.getBrokerService() + .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) + .thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("Total number of partitions for topic {} is {}", topicName, + metadata.partitions); + } + metadataFuture.complete(metadata); + }).exceptionally(ex -> { + metadataFuture.completeExceptionally(ex.getCause()); + return null; + }); return metadataFuture; } From 23d8b530793d6e3fc30f0462724408ab5d5b3137 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 29 Apr 2024 09:22:15 +0300 Subject: [PATCH 2/2] Check system topic in all locations --- .../broker/admin/impl/PersistentTopicsBase.java | 14 ++------------ .../pulsar/broker/lookup/TopicLookupBase.java | 3 +-- .../pulsar/broker/web/PulsarWebResource.java | 11 +++++++++++ 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 47415333e647e..9b071b7eee3d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4272,8 +4272,7 @@ public CompletableFuture getPartitionedTopicMetadata( // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. authorizationFuture.thenCompose(__ -> - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), - SystemTopicNames.isSystemTopic(topicName))) + checkLocalOrGetPeerReplicationCluster(pulsar, topicName)) .thenCompose(res -> pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) .thenAccept(metadata -> { @@ -4304,16 +4303,7 @@ public static CompletableFuture unsafeGetPartitionedTo // and other vital information. Even after namespace starting deletion,, // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. - CompletableFuture clusterOwnershipCheck; - if (isSystemTopic(topicName)) { - clusterOwnershipCheck = CompletableFuture.completedFuture(null); - } else { - clusterOwnershipCheck = - checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) - .thenApply(res -> null); - } - - clusterOwnershipCheck.thenCompose(res -> pulsar.getBrokerService() + checkLocalOrGetPeerReplicationCluster(pulsar, topicName).thenCompose(res -> pulsar.getBrokerService() .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)) .thenAccept(metadata -> { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index 7b2c777414884..a3e6a724127c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -228,8 +228,7 @@ public static CompletableFuture lookupTopicAsync(PulsarService pulsarSe // and other vital information. Even after namespace starting deletion, // we need to access the metadata of system topics to create readers and clean up topic data. // If we don't do this, it can prevent namespace deletion due to inaccessible readers. - checkLocalOrGetPeerReplicationCluster(pulsarService, - topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName)) + checkLocalOrGetPeerReplicationCluster(pulsarService, topicName) .thenAccept(peerClusterData -> { if (peerClusterData == null) { // (4) all validation passed: initiate lookup diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 99f0a30d1a5f2..3f508385f3329 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -878,10 +879,20 @@ protected CompletableFuture validateGlobalNamespaceOwnershipAsync(Namespac }); } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, + TopicName topicName) { + if (isSystemTopic(topicName)) { + return CompletableFuture.completedFuture(null); + } else { + return checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject(), false); + } + } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace) { return checkLocalOrGetPeerReplicationCluster(pulsarService, namespace, false); } + public static CompletableFuture checkLocalOrGetPeerReplicationCluster(PulsarService pulsarService, NamespaceName namespace, boolean allowDeletedNamespace) {