From c4c60fe54920e77bf7867869d94bd02d3cb1a1ae Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 14 Oct 2024 16:25:41 -0700 Subject: [PATCH 1/5] [fix][broker] Do not record as a topic load failure for the requested broker is not the owner --- .../apache/pulsar/broker/PulsarService.java | 5 +++ .../pulsar/broker/service/BrokerService.java | 14 ++++++-- .../auth/MockedPulsarServiceBaseTest.java | 8 +++++ .../namespace/NamespaceServiceTest.java | 8 ----- .../broker/service/BrokerServiceTest.java | 34 ++++++++++++++++++- .../pulsar/client/api/SimpleSchemaTest.java | 5 --- .../pulsar/client/impl/TableViewTest.java | 5 --- .../org/apache/pulsar/schema/SchemaTest.java | 5 --- 8 files changed, 57 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dcc0e961275bd..188699394bdf2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -2159,4 +2159,9 @@ private TopicPoliciesService initTopicPoliciesService() throws Exception { return (TopicPoliciesService) Reflections.createInstance(className, Thread.currentThread().getContextClassLoader()); } + + @VisibleForTesting + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index fee5e25647ce6..84373cba7e5ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1318,7 +1318,11 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(); + // If the broker is not the owner of the topic, the client will redirect to another broker. + // In this case, we should not record as a topic load failure. + if (!(FutureUtil.unwrapCompletionException(t) instanceof ServiceUnitNotReadyException)) { + getPulsarStats().recordTopicLoadFailed(); + } pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); @@ -1612,8 +1616,12 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); topicFuture.exceptionally(t -> { - pulsarStats.recordTopicLoadFailed(); - return null; + // If the broker is not the owner of the topic, the client will redirect to another broker. + // In this case, we should not record as a topic load failure. + if (!(FutureUtil.unwrapCompletionException(t) instanceof ServiceUnitNotReadyException)) { + getPulsarStats().recordTopicLoadFailed(); + } + return Optional.empty(); }); checkTopicNsOwnership(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 8dd2fc1c3c26d..7aabd62e63c3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -714,6 +714,14 @@ public Object[][] incorrectPersistentPolicies() { }; } + @DataProvider(name = "TopicDomain") + public Object[][] topicDomain() { + return new Object[][] { + {"persistent"}, + {"non-persistent"} + }; + } + protected ServiceProducer getServiceProducer(ProducerImpl clientProducer, String topicName) { PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 951247bd68861..7c784416f3392 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -808,14 +808,6 @@ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception { assertFalse(getResult.isPresent()); } - @DataProvider(name = "topicDomain") - public Object[] topicDomain() { - return new Object[]{ - TopicDomain.persistent.value(), - TopicDomain.non_persistent.value() - }; - } - @Test(dataProvider = "topicDomain") public void testCheckTopicExists(String topicDomain) throws Exception { String topic = topicDomain + "://prop/ns-abc/" + UUID.randomUUID(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index e05bb836a3ce6..fe3e09b39f9f8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -26,7 +26,9 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -50,6 +52,7 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -1308,7 +1311,7 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro topic.checkInactiveSubscriptions(); // Compaction subscription should not call delete method. - Mockito.verify(spySubscription, Mockito.never()).delete(); + verify(spySubscription, never()).delete(); // check if the subscription exist. assertNotNull(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION)); @@ -1916,5 +1919,34 @@ public void close() { } } } + + @Test(dataProvider = "TopicDomain") + public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException(String topicDomain) throws Exception { + final String topicName = topicDomain + "://prop/ns-abc/topic-not-ready"; + BrokerService service = pulsar.getBrokerService(); + + // Mock the checkTopicNsOwnership to throw ServiceUnitNotReadyException + BrokerService spyService = spy(service); + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServiceUnitNotReadyException("Test exception"))) + .when(spyService).checkTopicNsOwnership(topicName); + + pulsar.setBrokerService(spyService); + when(spyService.getPulsarStats()).thenReturn(mock(PulsarStats.class)); + + // Call the method under test + CompletableFuture> future = spyService.loadOrCreatePersistentTopic(topicName, true, + Collections.emptyMap(), null); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), never()).recordTopicLoadFailed(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java index e006b72fad279..71a1686555939 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java @@ -104,11 +104,6 @@ public static Object[][] schemaValidationModes() { }; } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[] { "persistent://", "non-persistent://" }; - } - private final boolean schemaValidationEnforced; @Factory(dataProvider = "schemaValidationModes") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 5448751160a9c..4db3b27e1c544 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -98,11 +98,6 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[]{ TopicDomain.persistent.value(), TopicDomain.non_persistent.value()}; - } - private Set publishMessages(String topic, int count, boolean enableBatch) throws Exception { return publishMessages(topic, 0, count, enableBatch, false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index ab82f981b5df3..ccaf2b3da85b2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -128,11 +128,6 @@ public void cleanup() throws Exception { super.internalCleanup(); } - @DataProvider(name = "topicDomain") - public static Object[] topicDomain() { - return new Object[] { "persistent://", "non-persistent://" }; - } - @Test public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{ final String tenant = PUBLIC_TENANT; From 16e645f1bc6db120f00a66fa3290b3a00d450436 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 14 Oct 2024 16:42:08 -0700 Subject: [PATCH 2/5] Check error message --- .../org/apache/pulsar/broker/service/BrokerService.java | 8 ++++++-- .../apache/pulsar/broker/service/BrokerServiceTest.java | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 84373cba7e5ca..eac0a4719a639 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1320,7 +1320,9 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.exceptionally(t -> { // If the broker is not the owner of the topic, the client will redirect to another broker. // In this case, we should not record as a topic load failure. - if (!(FutureUtil.unwrapCompletionException(t) instanceof ServiceUnitNotReadyException)) { + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof ServiceUnitNotReadyException) + && !(cause.getMessage().contains("not served by this instance"))) { getPulsarStats().recordTopicLoadFailed(); } pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -1618,7 +1620,9 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S topicFuture.exceptionally(t -> { // If the broker is not the owner of the topic, the client will redirect to another broker. // In this case, we should not record as a topic load failure. - if (!(FutureUtil.unwrapCompletionException(t) instanceof ServiceUnitNotReadyException)) { + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof ServiceUnitNotReadyException) + && !(cause.getMessage().contains("not served by this instance"))) { getPulsarStats().recordTopicLoadFailed(); } return Optional.empty(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index fe3e09b39f9f8..a11561f9e70cc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1928,7 +1928,9 @@ public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException(Stri // Mock the checkTopicNsOwnership to throw ServiceUnitNotReadyException BrokerService spyService = spy(service); doReturn(CompletableFuture.failedFuture( - new BrokerServiceException.ServiceUnitNotReadyException("Test exception"))) + new BrokerServiceException.ServiceUnitNotReadyException("Namespace bundle for topic " + + "(persistent://xxx/xxx/xxx-partition-0) not served by this instance:xyz. " + + "Please redo the lookup. Request is denied: namespace=pfs/ons"))) .when(spyService).checkTopicNsOwnership(topicName); pulsar.setBrokerService(spyService); From 1e5adcff9751847482a449d80309de906644d607 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 14 Oct 2024 17:26:17 -0700 Subject: [PATCH 3/5] Revert the change for non-persistent topic --- .../pulsar/broker/service/BrokerService.java | 13 ++---- .../broker/service/BrokerServiceTest.java | 43 ++++++++++++++++--- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index eac0a4719a639..6e785b8fb72b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1320,12 +1320,7 @@ private CompletableFuture> createNonPersistentTopic(String topic topicFuture.exceptionally(t -> { // If the broker is not the owner of the topic, the client will redirect to another broker. // In this case, we should not record as a topic load failure. - Throwable cause = FutureUtil.unwrapCompletionException(t); - if (!(cause instanceof ServiceUnitNotReadyException) - && !(cause.getMessage().contains("not served by this instance"))) { - getPulsarStats().recordTopicLoadFailed(); - } - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + pulsarStats.recordTopicLoadFailed(); return null; }); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -1621,11 +1616,11 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S // If the broker is not the owner of the topic, the client will redirect to another broker. // In this case, we should not record as a topic load failure. Throwable cause = FutureUtil.unwrapCompletionException(t); - if (!(cause instanceof ServiceUnitNotReadyException) - && !(cause.getMessage().contains("not served by this instance"))) { + if (!(cause instanceof ServiceUnitNotReadyException + && cause.getMessage().contains("not served by this instance"))) { getPulsarStats().recordTopicLoadFailed(); } - return Optional.empty(); + return null; }); checkTopicNsOwnership(topic) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index a11561f9e70cc..07cee6ba85a72 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -1920,9 +1921,8 @@ public void close() { } } - @Test(dataProvider = "TopicDomain") - public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException(String topicDomain) throws Exception { - final String topicName = topicDomain + "://prop/ns-abc/topic-not-ready"; + public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException() throws Exception { + final String topicName = "persistent://prop/ns-abc/topic-not-ready"; BrokerService service = pulsar.getBrokerService(); // Mock the checkTopicNsOwnership to throw ServiceUnitNotReadyException @@ -1937,8 +1937,7 @@ public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException(Stri when(spyService.getPulsarStats()).thenReturn(mock(PulsarStats.class)); // Call the method under test - CompletableFuture> future = spyService.loadOrCreatePersistentTopic(topicName, true, - Collections.emptyMap(), null); + CompletableFuture> future = spyService.getTopic(topicName, true); try { future.get(); @@ -1949,6 +1948,40 @@ public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException(Stri // Verify that recordTopicLoadFailed was not called verify(spyService.getPulsarStats(), never()).recordTopicLoadFailed(); + + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServerMetadataException("Server metadata error"))) + .when(spyService).checkTopicNsOwnership(topicName); + + // Call the method under test + future = spyService.getTopic(topicName, true); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServerMetadataException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), times(1)).recordTopicLoadFailed(); + + doReturn(CompletableFuture.failedFuture( + new BrokerServiceException.ServiceUnitNotReadyException("Other exception"))) + .when(spyService).checkTopicNsOwnership(topicName); + + // Call the method under test + future = spyService.getTopic(topicName, true); + + try { + future.get(); + fail("Should have thrown an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof BrokerServiceException.ServiceUnitNotReadyException); + } + + // Verify that recordTopicLoadFailed was not called + verify(spyService.getPulsarStats(), times(2)).recordTopicLoadFailed(); } } From 8a397b74f7ec05ec87e18908a48ecb484268026d Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 14 Oct 2024 17:27:50 -0700 Subject: [PATCH 4/5] Adjust tests --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 5 ----- .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 1 - 2 files changed, 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 188699394bdf2..dcc0e961275bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -2159,9 +2159,4 @@ private TopicPoliciesService initTopicPoliciesService() throws Exception { return (TopicPoliciesService) Reflections.createInstance(className, Thread.currentThread().getContextClassLoader()); } - - @VisibleForTesting - public void setBrokerService(BrokerService brokerService) { - this.brokerService = brokerService; - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 07cee6ba85a72..23a6bbc0eface 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1933,7 +1933,6 @@ public void testLoadOrCreatePersistentTopicWithServiceUnitNotReadyException() th + "Please redo the lookup. Request is denied: namespace=pfs/ons"))) .when(spyService).checkTopicNsOwnership(topicName); - pulsar.setBrokerService(spyService); when(spyService.getPulsarStats()).thenReturn(mock(PulsarStats.class)); // Call the method under test From 18d61361302fe96ff68300e4bbe2ed565aa41992 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Mon, 14 Oct 2024 23:29:40 -0700 Subject: [PATCH 5/5] Fix checkstyle --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 +-- .../apache/pulsar/broker/namespace/NamespaceServiceTest.java | 2 -- .../org/apache/pulsar/broker/service/BrokerServiceTest.java | 1 - .../src/test/java/org/apache/pulsar/schema/SchemaTest.java | 1 - 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6e785b8fb72b4..18808c1801483 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1318,9 +1318,8 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture> createNonPersistentTopic(String topic) { CompletableFuture> topicFuture = new CompletableFuture<>(); topicFuture.exceptionally(t -> { - // If the broker is not the owner of the topic, the client will redirect to another broker. - // In this case, we should not record as a topic load failure. pulsarStats.recordTopicLoadFailed(); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); return null; }); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 7c784416f3392..f9836bbf5c093 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -78,7 +78,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; @@ -103,7 +102,6 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "flaky") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 23a6bbc0eface..0755861af5954 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -53,7 +53,6 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index ccaf2b3da85b2..6db29ef64ba6c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -98,7 +98,6 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j