diff --git a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreatObserverAdapter.java b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreatObserverAdapter.java new file mode 100644 index 000000000000..881918bd1aa2 --- /dev/null +++ b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/CreatObserverAdapter.java @@ -0,0 +1,51 @@ +package org.apache.dubbo.reactive; + +import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; + +import org.mockito.Mockito; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; + +public class CreatObserverAdapter { + + private ServerCallToObserverAdapter responseObserver; + private AtomicInteger nextCounter; + private AtomicInteger completeCounter; + private AtomicInteger errorCounter; + + CreatObserverAdapter() { + + nextCounter = new AtomicInteger(); + completeCounter = new AtomicInteger(); + errorCounter = new AtomicInteger(); + + responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); + doAnswer(o -> nextCounter.incrementAndGet()) + .when(responseObserver).onNext(anyString()); + doAnswer(o -> completeCounter.incrementAndGet()) + .when(responseObserver).onCompleted(); + doAnswer(o -> errorCounter.incrementAndGet()) + .when(responseObserver).onError(any(Throwable.class)); + + } + + public AtomicInteger getCompleteCounter() { + return completeCounter; + } + + public AtomicInteger getNextCounter() { + return nextCounter; + } + + public AtomicInteger getErrorCounter() { + return errorCounter; + } + + public ServerCallToObserverAdapter getResponseObserver() { + return this.responseObserver; + } +} diff --git a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToManyMethodHandlerTest.java index b1c642967b2e..7723d0c702be 100644 --- a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToManyMethodHandlerTest.java +++ b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToManyMethodHandlerTest.java @@ -19,47 +19,31 @@ import org.apache.dubbo.common.stream.StreamObserver; import org.apache.dubbo.reactive.handler.ManyToManyMethodHandler; -import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; /** * Unit test for ManyToManyMethodHandler */ public final class ManyToManyMethodHandlerTest { - @Test void testInvoke() throws ExecutionException, InterruptedException { - AtomicInteger nextCounter = new AtomicInteger(); - AtomicInteger completeCounter = new AtomicInteger(); - AtomicInteger errorCounter = new AtomicInteger(); - ServerCallToObserverAdapter responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); - doAnswer(o -> nextCounter.incrementAndGet()) - .when(responseObserver).onNext(anyString()); - doAnswer(o -> completeCounter.incrementAndGet()) - .when(responseObserver).onCompleted(); - doAnswer(o -> errorCounter.incrementAndGet()) - .when(responseObserver).onError(any(Throwable.class)); + CreatObserverAdapter creator = new CreatObserverAdapter(); + ManyToManyMethodHandler handler = new ManyToManyMethodHandler<>(requestFlux -> requestFlux.map(r -> r + "0")); - CompletableFuture> future = handler.invoke(new Object[]{responseObserver}); + CompletableFuture> future = handler.invoke(new Object[]{creator.getResponseObserver()}); StreamObserver requestObserver = future.get(); for (int i = 0; i < 10; i++) { requestObserver.onNext(String.valueOf(i)); } requestObserver.onCompleted(); - Assertions.assertEquals(10, nextCounter.get()); - Assertions.assertEquals(0, errorCounter.get()); - Assertions.assertEquals(1, completeCounter.get()); + Assertions.assertEquals(10, creator.getNextCounter().get()); + Assertions.assertEquals(0, creator.getErrorCounter().get()); + Assertions.assertEquals(1, creator.getCompleteCounter().get()); } } diff --git a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToOneMethodHandlerTest.java b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToOneMethodHandlerTest.java index 1a5953fad159..c4ea8e34b10a 100644 --- a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToOneMethodHandlerTest.java +++ b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/ManyToOneMethodHandlerTest.java @@ -22,63 +22,45 @@ import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; /** * Unit test for ManyToOneMethodHandler */ public final class ManyToOneMethodHandlerTest { - @Test - void testInvoker() throws ExecutionException, InterruptedException { - AtomicInteger nextCounter = new AtomicInteger(); - AtomicInteger completeCounter = new AtomicInteger(); - AtomicInteger errorCounter = new AtomicInteger(); - ServerCallToObserverAdapter responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); - doAnswer(o -> nextCounter.incrementAndGet()) - .when(responseObserver).onNext(anyString()); - doAnswer(o -> completeCounter.incrementAndGet()) - .when(responseObserver).onCompleted(); - doAnswer(o -> errorCounter.incrementAndGet()) - .when(responseObserver).onError(any(Throwable.class)); + private StreamObserver requestObserver; + private CreatObserverAdapter creator; + + @BeforeEach + void init() throws ExecutionException, InterruptedException { + creator = new CreatObserverAdapter(); ManyToOneMethodHandler handler = new ManyToOneMethodHandler<>(requestFlux -> requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf)); - CompletableFuture> future = handler.invoke(new Object[]{responseObserver}); - StreamObserver requestObserver = future.get(); + CompletableFuture> future = handler.invoke(new Object[]{creator.getResponseObserver()}); + requestObserver = future.get(); + } + + @Test + void testInvoker() { for (int i = 0; i < 10; i++) { requestObserver.onNext(String.valueOf(i)); } requestObserver.onCompleted(); - Assertions.assertEquals(1, nextCounter.get()); - Assertions.assertEquals(0, errorCounter.get()); - Assertions.assertEquals(1, completeCounter.get()); + Assertions.assertEquals(1, creator.getNextCounter().get()); + Assertions.assertEquals(0, creator.getErrorCounter().get()); + Assertions.assertEquals(1, creator.getCompleteCounter().get()); } @Test - void testError() throws ExecutionException, InterruptedException { - AtomicInteger nextCounter = new AtomicInteger(); - AtomicInteger completeCounter = new AtomicInteger(); - AtomicInteger errorCounter = new AtomicInteger(); - ServerCallToObserverAdapter responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); - doAnswer(o -> nextCounter.incrementAndGet()) - .when(responseObserver).onNext(anyString()); - doAnswer(o -> completeCounter.incrementAndGet()) - .when(responseObserver).onCompleted(); - doAnswer(o -> errorCounter.incrementAndGet()) - .when(responseObserver).onError(any(Throwable.class)); - ManyToOneMethodHandler handler = new ManyToOneMethodHandler<>(requestFlux -> - requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf)); - CompletableFuture> future = handler.invoke(new Object[]{responseObserver}); - StreamObserver requestObserver = future.get(); + void testError() { for (int i = 0; i < 10; i++) { if (i == 6) { requestObserver.onError(new Throwable()); @@ -86,8 +68,8 @@ void testError() throws ExecutionException, InterruptedException { requestObserver.onNext(String.valueOf(i)); } requestObserver.onCompleted(); - Assertions.assertEquals(0, nextCounter.get()); - Assertions.assertEquals(1, errorCounter.get()); - Assertions.assertEquals(0, completeCounter.get()); + Assertions.assertEquals(0, creator.getNextCounter().get()); + Assertions.assertEquals(1, creator.getErrorCounter().get()); + Assertions.assertEquals(0, creator.getCompleteCounter().get()); } } diff --git a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/OneToManyMethodHandlerTest.java b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/OneToManyMethodHandlerTest.java index f1cdeb6f6547..8e0622a9a544 100644 --- a/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/OneToManyMethodHandlerTest.java +++ b/dubbo-plugin/dubbo-reactive/src/test/java/org/apache/dubbo/reactive/OneToManyMethodHandlerTest.java @@ -21,57 +21,42 @@ import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import reactor.core.publisher.Flux; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; /** * Unit test for OneToManyMethodHandler */ public final class OneToManyMethodHandlerTest { + private CreatObserverAdapter creator; + + @BeforeEach + void init() { + creator = new CreatObserverAdapter(); + } + @Test void testInvoke() { String request = "1,2,3,4,5,6,7"; - AtomicInteger nextCounter = new AtomicInteger(); - AtomicInteger completeCounter = new AtomicInteger(); - AtomicInteger errorCounter = new AtomicInteger(); - ServerCallToObserverAdapter responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); - doAnswer(o -> nextCounter.incrementAndGet()) - .when(responseObserver).onNext(anyString()); - doAnswer(o -> completeCounter.incrementAndGet()) - .when(responseObserver).onCompleted(); - doAnswer(o -> errorCounter.incrementAndGet()) - .when(responseObserver).onError(any(Throwable.class)); OneToManyMethodHandler handler = new OneToManyMethodHandler<>(requestMono -> requestMono.flatMapMany(r -> Flux.fromArray(r.split(",")))); - CompletableFuture future = handler.invoke(new Object[]{request, responseObserver}); + CompletableFuture future = handler.invoke(new Object[]{request, creator.getResponseObserver()}); Assertions.assertTrue(future.isDone()); - Assertions.assertEquals(7, nextCounter.get()); - Assertions.assertEquals(0, errorCounter.get()); - Assertions.assertEquals(1, completeCounter.get()); + Assertions.assertEquals(7, creator.getNextCounter().get()); + Assertions.assertEquals(0, creator.getErrorCounter().get()); + Assertions.assertEquals(1, creator.getCompleteCounter().get()); } @Test void testError() { String request = "1,2,3,4,5,6,7"; - AtomicInteger nextCounter = new AtomicInteger(); - AtomicInteger completeCounter = new AtomicInteger(); - AtomicInteger errorCounter = new AtomicInteger(); - ServerCallToObserverAdapter responseObserver = Mockito.mock(ServerCallToObserverAdapter.class); - doAnswer(o -> nextCounter.incrementAndGet()) - .when(responseObserver).onNext(anyString()); - doAnswer(o -> completeCounter.incrementAndGet()) - .when(responseObserver).onCompleted(); - doAnswer(o -> errorCounter.incrementAndGet()) - .when(responseObserver).onError(any(Throwable.class)); OneToManyMethodHandler handler = new OneToManyMethodHandler<>(requestMono -> Flux.create(emitter -> { for (int i = 0; i < 10; i++) { @@ -82,10 +67,10 @@ void testError() { } } })); - CompletableFuture future = handler.invoke(new Object[]{request, responseObserver}); + CompletableFuture future = handler.invoke(new Object[]{request, creator.getResponseObserver()}); Assertions.assertTrue(future.isDone()); - Assertions.assertEquals(6, nextCounter.get()); - Assertions.assertEquals(1, errorCounter.get()); - Assertions.assertEquals(0, completeCounter.get()); + Assertions.assertEquals(6, creator.getNextCounter().get()); + Assertions.assertEquals(1, creator.getErrorCounter().get()); + Assertions.assertEquals(0, creator.getCompleteCounter().get()); } }