Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.apache.dubbo.reactive;

import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.mockito.Mockito;

import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

public class CreatObserverAdapter {

private ServerCallToObserverAdapter<String> responseObserver;
private AtomicInteger nextCounter;
private AtomicInteger completeCounter;
private AtomicInteger errorCounter;

CreatObserverAdapter() {

nextCounter = new AtomicInteger();
completeCounter = new AtomicInteger();
errorCounter = new AtomicInteger();

responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));

}

public AtomicInteger getCompleteCounter() {
return completeCounter;
}

public AtomicInteger getNextCounter() {
return nextCounter;
}

public AtomicInteger getErrorCounter() {
return errorCounter;
}

public ServerCallToObserverAdapter<String> getResponseObserver() {
return this.responseObserver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,31 @@

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.reactive.handler.ManyToManyMethodHandler;
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for ManyToManyMethodHandler
*/
public final class ManyToManyMethodHandlerTest {

@Test
void testInvoke() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
CreatObserverAdapter creator = new CreatObserverAdapter();

ManyToManyMethodHandler<String, String> handler = new ManyToManyMethodHandler<>(requestFlux ->
requestFlux.map(r -> r + "0"));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{creator.getResponseObserver()});
StreamObserver<String> requestObserver = future.get();
for (int i = 0; i < 10; i++) {
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(10, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(10, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,72 +22,54 @@
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for ManyToOneMethodHandler
*/
public final class ManyToOneMethodHandlerTest {

@Test
void testInvoker() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
private StreamObserver<String> requestObserver;
private CreatObserverAdapter creator;

@BeforeEach
void init() throws ExecutionException, InterruptedException {
creator = new CreatObserverAdapter();
ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestFlux ->
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
StreamObserver<String> requestObserver = future.get();
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{creator.getResponseObserver()});
requestObserver = future.get();
}

@Test
void testInvoker() {
for (int i = 0; i < 10; i++) {
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(1, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(1, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}

@Test
void testError() throws ExecutionException, InterruptedException {
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
ManyToOneMethodHandler<String, String> handler = new ManyToOneMethodHandler<>(requestFlux ->
requestFlux.map(Integer::valueOf).reduce(Integer::sum).map(String::valueOf));
CompletableFuture<StreamObserver<String>> future = handler.invoke(new Object[]{responseObserver});
StreamObserver<String> requestObserver = future.get();
void testError() {
for (int i = 0; i < 10; i++) {
if (i == 6) {
requestObserver.onError(new Throwable());
}
requestObserver.onNext(String.valueOf(i));
}
requestObserver.onCompleted();
Assertions.assertEquals(0, nextCounter.get());
Assertions.assertEquals(1, errorCounter.get());
Assertions.assertEquals(0, completeCounter.get());
Assertions.assertEquals(0, creator.getNextCounter().get());
Assertions.assertEquals(1, creator.getErrorCounter().get());
Assertions.assertEquals(0, creator.getCompleteCounter().get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,57 +21,42 @@
import org.apache.dubbo.rpc.protocol.tri.observer.ServerCallToObserverAdapter;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;

/**
* Unit test for OneToManyMethodHandler
*/
public final class OneToManyMethodHandlerTest {

private CreatObserverAdapter creator;

@BeforeEach
void init() {
creator = new CreatObserverAdapter();
}

@Test
void testInvoke() {
String request = "1,2,3,4,5,6,7";
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestMono ->
requestMono.flatMapMany(r -> Flux.fromArray(r.split(","))));
CompletableFuture<?> future = handler.invoke(new Object[]{request, responseObserver});
CompletableFuture<?> future = handler.invoke(new Object[]{request, creator.getResponseObserver()});
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(7, nextCounter.get());
Assertions.assertEquals(0, errorCounter.get());
Assertions.assertEquals(1, completeCounter.get());
Assertions.assertEquals(7, creator.getNextCounter().get());
Assertions.assertEquals(0, creator.getErrorCounter().get());
Assertions.assertEquals(1, creator.getCompleteCounter().get());
}

@Test
void testError() {
String request = "1,2,3,4,5,6,7";
AtomicInteger nextCounter = new AtomicInteger();
AtomicInteger completeCounter = new AtomicInteger();
AtomicInteger errorCounter = new AtomicInteger();
ServerCallToObserverAdapter<String> responseObserver = Mockito.mock(ServerCallToObserverAdapter.class);
doAnswer(o -> nextCounter.incrementAndGet())
.when(responseObserver).onNext(anyString());
doAnswer(o -> completeCounter.incrementAndGet())
.when(responseObserver).onCompleted();
doAnswer(o -> errorCounter.incrementAndGet())
.when(responseObserver).onError(any(Throwable.class));
OneToManyMethodHandler<String, String> handler = new OneToManyMethodHandler<>(requestMono ->
Flux.create(emitter -> {
for (int i = 0; i < 10; i++) {
Expand All @@ -82,10 +67,10 @@ void testError() {
}
}
}));
CompletableFuture<?> future = handler.invoke(new Object[]{request, responseObserver});
CompletableFuture<?> future = handler.invoke(new Object[]{request, creator.getResponseObserver()});
Assertions.assertTrue(future.isDone());
Assertions.assertEquals(6, nextCounter.get());
Assertions.assertEquals(1, errorCounter.get());
Assertions.assertEquals(0, completeCounter.get());
Assertions.assertEquals(6, creator.getNextCounter().get());
Assertions.assertEquals(1, creator.getErrorCounter().get());
Assertions.assertEquals(0, creator.getCompleteCounter().get());
}
}