diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index e076821855e59..9f6a10894c968 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -83,6 +83,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData; import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType; +import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; @@ -1677,7 +1678,7 @@ public void testForceDeleteNamespace() throws Exception { public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception { conf.setForceDeleteNamespaceAllowed(true); final String namespaceName = "prop-xyz2/ns1"; - TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); + TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); admin.tenants().createTenant("prop-xyz2", tenantInfo); admin.namespaces().createNamespace(namespaceName, 1); admin.namespaces().setAutoTopicCreation(namespaceName, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 7a6de0d7072d4..ecb587f553a7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -27,6 +27,7 @@ import java.net.URL; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Optional; import java.util.UUID; @@ -549,7 +550,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Except consumer2.receive(); } - TopicStats stats = getTopicStats(topic1); + TopicStats stats = admin.topics().getStats(topic1); assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5); assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); @@ -566,7 +567,7 @@ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Except Thread.sleep(2000L); rolloverStats(); - TopicStats stats2 = getTopicStats(topic1); + TopicStats stats2 = admin.topics().getStats(topic1); // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not. Awaitility.await().untilAsserted(() -> { assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java index 6b5f002ab4188..f74e72ce88206 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedCursorMetricsTest.java @@ -264,12 +264,12 @@ public void testCursorReadWriteMetrics() throws Exception { metricsList = metrics.generate(); Assert.assertEquals(metricsList.size(), 2); - Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L); - Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L); + Assert.assertNotEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L); Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); - Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L); - Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L); + Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 0L); + Assert.assertNotEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 0L); Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L); // cleanup. diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index c78c12eaec0fb..18723684afe42 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -459,7 +459,6 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& } messages_.push(msg); if (messageListener_) { - unAckedMessageTrackerPtr_->add(msg.getMessageId()); listenerExecutor_->postWork( std::bind(&MultiTopicsConsumerImpl::internalListener, shared_from_this(), consumer)); }