diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bfbd65234813e..4c4841b8371fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -56,6 +56,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; @@ -82,6 +83,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; @@ -867,12 +869,337 @@ protected CompletableFuture internalSetNamespaceReplicationClusters(List replicationClusterSet); })) + .thenCompose(replicationClusterSet -> + getNamespacePoliciesAsync(namespaceName) + .thenCompose(policies -> + validateReplicationClusterCompatibility(replicationClusterSet, + policies.replication_clusters)) + .thenApply(__ -> replicationClusterSet)) .thenCompose(replicationClusterSet -> updatePoliciesAsync(namespaceName, policies -> { policies.replication_clusters = replicationClusterSet; return policies; })); } + /** + * Validates compatibility between clusters when enabling namespace-level replication. + * This validation is only performed for newly added clusters. + * This includes: + * + * + * @param replicationClusterSet the new set of clusters to be configured + * @param existingClusters the existing set of replication clusters + * @return a CompletableFuture that completes when validation passes, or fails with RestException + */ + private CompletableFuture validateReplicationClusterCompatibility(Set replicationClusterSet, + Set existingClusters) { + String localCluster = pulsar().getConfiguration().getClusterName(); + + // Skip validation if local cluster is not in the replication set + if (!replicationClusterSet.contains(localCluster)) { + return CompletableFuture.completedFuture(null); + } + + // Find newly added clusters + Set newlyAddedClusters = new HashSet<>(replicationClusterSet); + if (existingClusters != null) { + newlyAddedClusters.removeAll(existingClusters); + } + + if (newlyAddedClusters.isEmpty()) { + // No new clusters added, skip validation + return CompletableFuture.completedFuture(null); + } + + List newRemoteClusters = newlyAddedClusters.stream() + .filter(cluster -> !cluster.equals(localCluster)) + .collect(Collectors.toList()); + + if (newRemoteClusters.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + // Validate compatibility with each newly added remote cluster + List> validationFutures = newRemoteClusters.stream() + .map(remoteCluster -> validateClusterPairCompatibility(localCluster, remoteCluster)) + .collect(Collectors.toList()); + + return FutureUtil.waitForAll(validationFutures); + } + + /** + * Validates compatibility between the local cluster and a remote cluster. + */ + private CompletableFuture validateClusterPairCompatibility(String localCluster, String remoteCluster) { + return clusterResources().getClusterAsync(remoteCluster) + .thenCompose(clusterDataOpt -> { + if (clusterDataOpt.isEmpty()) { + throw new RestException(Status.NOT_FOUND, "Cluster " + remoteCluster + " does not exist"); + } + ClusterData clusterData = clusterDataOpt.get(); + PulsarAdmin remoteAdmin; + try { + remoteAdmin = pulsar().getBrokerService() + .getClusterPulsarAdmin(remoteCluster, Optional.of(clusterData)); + } catch (Exception e) { + throw new RestException(Status.INTERNAL_SERVER_ERROR, + "Failed to update clusters because failed to create admin client for cluster " + + remoteCluster + ": " + e.getMessage()); + } + + // Validate both partition compatibility and auto-creation policy compatibility + return validatePartitionCompatibility(remoteAdmin, remoteCluster) + .thenCompose(__ -> validateAutoTopicCreationCompatibility(remoteAdmin, remoteCluster)); + }); + } + + /** + * Validates partition compatibility between local and remote clusters. + *
    + *
  • All partitioned topics (including __change_events) that exist in the local cluster + * must have the same partition count in the remote cluster (if they exist there).
  • + *
  • Non-partitioned topics in the local cluster must not exist as partitioned topics + * in the remote cluster.
  • + *
+ */ + private CompletableFuture validatePartitionCompatibility(PulsarAdmin remoteAdmin, String remoteCluster) { + // Get local partitioned topics + CompletableFuture> localPartitionedTopicsFuture = + pulsar().getNamespaceService().getFullListOfPartitionedTopic(namespaceName); + + // Get persistent topics only (non-persistent topics don't have persistent state + // and getListOfNonPersistentTopics triggers global namespace ownership validation + // which fails when namespace has no clusters configured yet) + CompletableFuture> localAllTopicsFuture = + pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + + return localPartitionedTopicsFuture.thenCombine(localAllTopicsFuture, + (localPartitionedTopics, localAllTopics) -> { + // Find non-partitioned topics (topics that are not in the partitioned list + // and are not partition suffixes of partitioned topics) + Set partitionedTopicSet = new HashSet<>(localPartitionedTopics); + List localNonPartitionedTopics = localAllTopics.stream() + .map(TopicName::get) + .filter(topicName -> !partitionedTopicSet.contains( + topicName.getPartitionedTopicName())) + .map(TopicName::toString) + .distinct() + .collect(Collectors.toList()); + + return new TopicLists(localPartitionedTopics, localNonPartitionedTopics); + }) + .thenCompose(topicLists -> { + List> validations = new ArrayList<>(); + + // Validate partitioned topics have same partition count + for (String topic : topicLists.partitionedTopics) { + validations.add(compareTopicPartitions(topic, remoteAdmin, remoteCluster)); + } + + // Validate non-partitioned topics don't exist as partitioned on remote + for (String topic : topicLists.nonPartitionedTopics) { + validations.add(validateNonPartitionedTopicCompatibility(topic, remoteAdmin, remoteCluster)); + } + + return FutureUtil.waitForAll(validations); + }); + } + + /** + * Helper class to hold partitioned and non-partitioned topic lists. + */ + private static class TopicLists { + final List partitionedTopics; + final List nonPartitionedTopics; + + TopicLists(List partitionedTopics, List nonPartitionedTopics) { + this.partitionedTopics = partitionedTopics; + this.nonPartitionedTopics = nonPartitionedTopics; + } + } + + /** + * Validates that a non-partitioned topic on local does not exist as a partitioned topic on remote. + */ + private CompletableFuture validateNonPartitionedTopicCompatibility(String topic, PulsarAdmin remoteAdmin, + String remoteCluster) { + return remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic) + .thenAccept(remoteMetadata -> { + // If remote has partitions > 0, it's a partitioned topic, which is incompatible + if (remoteMetadata.partitions > 0) { + throw new RestException(Status.CONFLICT, + String.format("Topic type mismatch for topic '%s': local cluster has a " + + "non-partitioned topic, but remote cluster '%s' has a partitioned " + + "topic with %d partitions. " + + "Please ensure topic types are the same before enabling replication.", + topic, remoteCluster, remoteMetadata.partitions)); + } + }) + .exceptionally(ex -> { + // If topic doesn't exist on remote, that's fine + if (ex.getCause() instanceof PulsarAdminException.NotFoundException + || ex instanceof PulsarAdminException.NotFoundException) { + return null; + } + throw new CompletionException(ex); + }); + } + + /** + * Compares the partition count of a topic between local and remote clusters. + * If the topic exists on local but not on remote, validation passes. + * If the topic exists on both clusters, partition counts must match. + */ + private CompletableFuture compareTopicPartitions(String topic, PulsarAdmin remoteAdmin, + String remoteCluster) { + TopicName topicName = TopicName.get(topic); + + // Get local partition metadata + CompletableFuture> localMetadataFuture = + pulsar().getPulsarResources().getNamespaceResources() + .getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName); + + // Get remote partition metadata, return Optional.empty() if topic doesn't exist + CompletableFuture> remoteMetadataFuture = + remoteAdmin.topics().getPartitionedTopicMetadataAsync(topic) + .thenApply(Optional::of) + .exceptionally(ex -> { + // If topic doesn't exist on remote, return empty + if (ex.getCause() instanceof PulsarAdminException.NotFoundException + || ex instanceof PulsarAdminException.NotFoundException) { + return Optional.empty(); + } + throw new CompletionException(ex); + }); + + return localMetadataFuture.thenCombine(remoteMetadataFuture, (localMetadataOpt, remoteMetadataOpt) -> { + // If topic doesn't exist on remote, validation passes + if (remoteMetadataOpt.isEmpty()) { + return null; + } + + int localPartitions = localMetadataOpt.map(m -> m.partitions).orElse(0); + int remotePartitions = remoteMetadataOpt.get().partitions; + + if (localPartitions != remotePartitions) { + String topicType = SystemTopicNames.isTopicPoliciesSystemTopic(topic) + ? "__change_events system topic" : "topic"; + throw new RestException(Status.CONFLICT, + String.format("Partition count mismatch for %s '%s': local cluster has %d partitions, " + + "remote cluster '%s' has %d partitions. " + + "Please ensure partition counts are the same before enabling replication.", + topicType, topic, localPartitions, remoteCluster, remotePartitions)); + } + return null; + }); + } + + /** + * Validates that the effective auto-topic creation policies are the same between local and remote clusters. + * The effective policy is computed by: namespace-level policy overrides broker-level if it exists. + */ + private CompletableFuture validateAutoTopicCreationCompatibility(PulsarAdmin remoteAdmin, + String remoteCluster) { + String namespaceStr = namespaceName.toString(); + + // Get local broker config + ServiceConfiguration localConfig = pulsar().getConfiguration(); + TopicType localBrokerAutoCreationType = localConfig.getAllowAutoTopicCreationType(); + int localBrokerDefaultPartitions = localConfig.getDefaultNumPartitions(); + + // Get local namespace policy + CompletableFuture localNsPolicyFuture = + getNamespacePoliciesAsync(namespaceName) + .thenApply(policies -> policies.autoTopicCreationOverride); + + // Get remote broker config + CompletableFuture> remoteBrokerConfigFuture = + remoteAdmin.brokers().getRuntimeConfigurationsAsync(); + + // Get remote namespace policy + CompletableFuture remoteNsPolicyFuture = + remoteAdmin.namespaces().getAutoTopicCreationAsync(namespaceStr) + .exceptionally(ex -> { + // If namespace doesn't have override, return null. + Throwable actEx = FutureUtil.unwrapCompletionException(ex); + if (actEx instanceof PulsarAdminException.NotFoundException) { + throw new RestException(Status.CONFLICT, + String.format("Effective auto-topic creation policy mismatch for namespace '%s' " + + "between local cluster and remote cluster '%s' -> '%s'. " + + "Please ensure the namespace exists on the remote side.", + namespaceStr, localConfig.getClusterName(), remoteCluster)); + } + throw new CompletionException(ex); + }); + + return CompletableFuture.allOf(localNsPolicyFuture, remoteBrokerConfigFuture, remoteNsPolicyFuture) + .thenAccept(__ -> { + AutoTopicCreationOverride localNsPolicy = localNsPolicyFuture.join(); + Map remoteBrokerConfig = remoteBrokerConfigFuture.join(); + AutoTopicCreationOverride remoteNsPolicy = remoteNsPolicyFuture.join(); + + // Parse remote broker config + String remoteAutoCreationTypeStr = remoteBrokerConfig.getOrDefault( + "allowAutoTopicCreationType", "non-partitioned"); + // Convert to uppercase and replace hyphen with underscore for enum parsing + TopicType remoteBrokerAutoCreationType = TopicType.valueOf( + remoteAutoCreationTypeStr.toUpperCase().replace("-", "_")); + int remoteBrokerDefaultPartitions = Integer.parseInt( + remoteBrokerConfig.getOrDefault("defaultNumPartitions", "1")); + + // Compute effective local policy (namespace-level overrides broker-level if exists) + String localEffectiveTopicType; + int localEffectiveDefaultPartitions; + if (localNsPolicy != null) { + localEffectiveTopicType = localNsPolicy.getTopicType(); + localEffectiveDefaultPartitions = localNsPolicy.getDefaultNumPartitions() != null + ? localNsPolicy.getDefaultNumPartitions() : localBrokerDefaultPartitions; + } else { + localEffectiveTopicType = localBrokerAutoCreationType.toString(); + localEffectiveDefaultPartitions = localBrokerDefaultPartitions; + } + + // Compute effective remote policy (namespace-level overrides broker-level if exists) + String remoteEffectiveTopicType; + int remoteEffectiveDefaultPartitions; + if (remoteNsPolicy != null) { + remoteEffectiveTopicType = remoteNsPolicy.getTopicType(); + remoteEffectiveDefaultPartitions = remoteNsPolicy.getDefaultNumPartitions() != null + ? remoteNsPolicy.getDefaultNumPartitions() : remoteBrokerDefaultPartitions; + } else { + remoteEffectiveTopicType = remoteBrokerAutoCreationType.toString(); + remoteEffectiveDefaultPartitions = remoteBrokerDefaultPartitions; + } + + // Compare effective policies (only topicType and defaultNumPartitions) + List mismatches = new ArrayList<>(); + if (!Objects.equals(localEffectiveTopicType, remoteEffectiveTopicType)) { + mismatches.add(String.format("topicType: local=%s, remote=%s", + localEffectiveTopicType, remoteEffectiveTopicType)); + } + // Pulsar does not allow to set a special partition count with non-partitioned topic type, comparing + // default topic count either default topic type is partitioned or non-partitioned. + if (localEffectiveDefaultPartitions != remoteEffectiveDefaultPartitions) { + mismatches.add(String.format("defaultNumPartitions: local=%d, remote=%d", + localEffectiveDefaultPartitions, remoteEffectiveDefaultPartitions)); + } + + if (!mismatches.isEmpty()) { + throw new RestException(Status.CONFLICT, + String.format("Effective auto-topic creation policy mismatch for namespace '%s' " + + "between local cluster and remote cluster '%s': %s. " + + "Please ensure auto-topic creation policies are the same " + + "before enabling replication.", + namespaceStr, remoteCluster, String.join("; ", mismatches))); + } + }); + } + protected CompletableFuture internalSetNamespaceMessageTTLAsync(Integer messageTTL) { return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SetReplicationClustersValidationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SetReplicationClustersValidationTest.java new file mode 100644 index 0000000000000..0465b3d065733 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SetReplicationClustersValidationTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import com.google.common.collect.Sets; +import java.util.HashSet; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Tests for validation when setting namespace replication clusters. + * These tests verify that partition compatibility and auto-topic creation policy + * compatibility are properly validated when enabling namespace-level replication. + */ +@Slf4j +@Test(groups = "broker-admin") +public class SetReplicationClustersValidationTest extends OneWayReplicatorTestBase { + + @Override + @BeforeClass(alwaysRun = true, timeOut = 300000) + public void setup() throws Exception { + super.setup(); + } + + @Override + @AfterClass(alwaysRun = true, timeOut = 300000) + public void cleanup() throws Exception { + super.cleanup(); + } + /** + * Helper method to clean up a namespace properly. + */ + private void clearReplicationPolicies(String namespace) throws PulsarAdminException { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1)); + admin2.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster2)); + } + + @Test + public void testSetReplicationClustersSuccess() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testSetReplicationClustersSuccess"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + // Set replication clusters should succeed when no topics exist + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + Set clusters = new HashSet<>(admin1.namespaces().getNamespaceReplicationClusters(namespace)); + assertEquals(clusters, Sets.newHashSet(cluster1, cluster2)); + // cleanup + clearReplicationPolicies(namespace); + } + + @Test + public void testSetReplicationClustersWithMatchingPartitionedTopics() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testMatchingPartitions"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/partitioned-topic"; + // Create partitioned topic with same partition count on both clusters + admin1.topics().createPartitionedTopic(topic, 4); + admin2.topics().createPartitionedTopic(topic, 4); + // Set replication clusters should succeed + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + Set clusters = new HashSet<>(admin1.namespaces().getNamespaceReplicationClusters(namespace)); + assertEquals(clusters, Sets.newHashSet(cluster1, cluster2)); + // cleanup + clearReplicationPolicies(namespace); + admin1.topics().deletePartitionedTopic(topic); + admin2.topics().deletePartitionedTopic(topic); + } + + @Test + public void testSetReplicationClustersWithMismatchedPartitionedTopics() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testMismatchedPartitions"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/partitioned-topic"; + // Create partitioned topic with different partition counts + admin1.topics().createPartitionedTopic(topic, 4); + admin2.topics().createPartitionedTopic(topic, 8); + // Set replication clusters should fail due to partition mismatch + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + fail("Should have failed due to partition count mismatch"); + } catch (PulsarAdminException.ConflictException e) { + assertTrue(e.getMessage().contains("Partition count mismatch")); + assertTrue(e.getMessage().contains("local cluster has 4 partitions")); + assertTrue(e.getMessage().contains("has 8 partitions")); + } + // cleanup + clearReplicationPolicies(namespace); + admin1.topics().deletePartitionedTopic(topic); + admin2.topics().deletePartitionedTopic(topic); + } + + @Test + public void testSetReplicationClustersTopicExistsOnlyOnLocal() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testTopicOnlyOnLocal"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/local-only-topic"; + // Create partitioned topic only on local cluster + admin1.topics().createPartitionedTopic(topic, 4); + // Set replication clusters should succeed (topic doesn't exist on remote) + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + Set clusters = new HashSet<>(admin1.namespaces().getNamespaceReplicationClusters(namespace)); + assertEquals(clusters, Sets.newHashSet(cluster1, cluster2)); + // cleanup + clearReplicationPolicies(namespace); + admin1.topics().deletePartitionedTopic(topic); + admin2.namespaces().unload(namespace); + } + + @Test + public void testSetReplicationClustersNonPartitionedVsPartitioned() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testNonPartitionedVsPartitioned"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/topic-type-mismatch"; + // Create non-partitioned topic on local, partitioned topic on remote + admin1.topics().createNonPartitionedTopic(topic); + admin2.topics().createPartitionedTopic(topic, 4); + // Set replication clusters should fail due to topic type mismatch + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + fail("Should have failed due to topic type mismatch"); + } catch (PulsarAdminException.ConflictException e) { + assertTrue(e.getMessage().contains("Topic type mismatch")); + assertTrue(e.getMessage().contains("non-partitioned topic")); + assertTrue(e.getMessage().contains("partitioned topic")); + } + // cleanup + clearReplicationPolicies(namespace); + admin1.topics().delete(topic); + admin2.topics().deletePartitionedTopic(topic); + } + + @Test + public void testSetReplicationClustersWithMatchingAutoTopicCreationPolicy() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testMatchingAutoTopicPolicy"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + // Set same auto-topic creation policy on both namespaces + AutoTopicCreationOverride policy = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(4) + .build(); + admin1.namespaces().setAutoTopicCreation(namespace, policy); + admin2.namespaces().setAutoTopicCreation(namespace, policy); + // Set replication clusters should succeed + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + Set clusters = new HashSet<>(admin1.namespaces().getNamespaceReplicationClusters(namespace)); + assertEquals(clusters, Sets.newHashSet(cluster1, cluster2)); + // cleanup + clearReplicationPolicies(namespace); + } + + @Test + public void testSetReplicationClustersWithMismatchedAutoTopicCreationType() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testMismatchedAutoTopicType"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + // Set different auto-topic creation types + AutoTopicCreationOverride policy1 = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(4) + .build(); + AutoTopicCreationOverride policy2 = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.NON_PARTITIONED.toString()) + .build(); + admin1.namespaces().setAutoTopicCreation(namespace, policy1); + admin2.namespaces().setAutoTopicCreation(namespace, policy2); + // Set replication clusters should fail due to policy mismatch + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + fail("Should have failed due to auto-topic creation policy mismatch"); + } catch (PulsarAdminException.ConflictException e) { + assertTrue(e.getMessage().contains("auto-topic creation policy mismatch")); + assertTrue(e.getMessage().contains("topicType")); + } + } + + @Test + public void testSetReplicationClustersWithMismatchedDefaultNumPartitions() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testMismatchedNumPartitions"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + // Set different default partition counts + AutoTopicCreationOverride policy1 = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(4) + .build(); + AutoTopicCreationOverride policy2 = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.PARTITIONED.toString()) + .defaultNumPartitions(8) + .build(); + admin1.namespaces().setAutoTopicCreation(namespace, policy1); + admin2.namespaces().setAutoTopicCreation(namespace, policy2); + + // Set replication clusters should fail due to partition count mismatch + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + fail("Should have failed due to defaultNumPartitions mismatch"); + } catch (PulsarAdminException.ConflictException e) { + assertTrue(e.getMessage().contains("auto-topic creation policy mismatch")); + assertTrue(e.getMessage().contains("defaultNumPartitions")); + } + + // Verify: Pulsar does not allow setting non-partitioned and a certain partition counts. + try { + policy1 = AutoTopicCreationOverride.builder() + .allowAutoTopicCreation(true) + .topicType(TopicType.NON_PARTITIONED.toString()) + .defaultNumPartitions(4) + .build(); + admin1.namespaces().setAutoTopicCreation(namespace, policy1); + fail("Expected behaviour: Pulsar does not allow setting non-partitioned and a certain partition counts"); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("is not allowed to be set when the type is non-partition")); + } + } + + @Test + public void testSetReplicationClustersSkipsValidationWhenLocalClusterNotInSet() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testSkipValidationNoLocalCluster"); + admin1.namespaces().createNamespace(namespace); + admin2.namespaces().createNamespace(namespace); + String topic = "persistent://" + namespace + "/partitioned-topic"; + // Create partitioned topic with different counts. + admin1.topics().createPartitionedTopic(topic, 1); + admin2.topics().createPartitionedTopic(topic, 4); + // This should succeed because validation is skipped when local cluster is not in the set + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster2)); + Set clusters = new HashSet<>(admin1.namespaces().getNamespaceReplicationClusters(namespace)); + assertEquals(clusters, Sets.newHashSet(cluster2)); + // cleanup + clearReplicationPolicies(namespace); + } + + @Test + public void testRemoteSideHasNotNamespace() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("public/testRemoteSideHasNotNamespace"); + + // local namespace does not exist. + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("does not exist")); + } + + // remote namespace does not exist. + admin1.namespaces().createNamespace(namespace); + try { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Please ensure the namespace exists on the remote side")); + } + + // Both exist. + admin2.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet(cluster1, cluster2)); + } + +} +