Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
.idea/
build/
out/
.profileconfig.json

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -47,7 +48,7 @@ public static <T> Single<T> toSingle(CompletionStage<T> future) {
future.whenComplete((result, throwable) -> {
if (!emitter.isDisposed()) {
if (throwable != null) {
emitter.onError(throwable);
emitter.onError(new ExecutionException(throwable));
} else if (result != null) {
emitter.onSuccess(result);
} else {
Expand Down Expand Up @@ -79,7 +80,7 @@ public static <T> Maybe<T> toMaybe(CompletionStage<T> future) {
future.whenComplete((result, throwable) -> {
if (!emitter.isDisposed()) {
if (throwable != null) {
emitter.onError(throwable);
emitter.onError(new ExecutionException(throwable));
} else if (result != null) {
emitter.onSuccess(result);
} else {
Expand Down Expand Up @@ -110,7 +111,7 @@ public static Completable toCompletable(CompletionStage<Void> future) {
if (throwable != null) {
// Emit only the cause message
if (throwable.getCause() != null) {
emitter.onError(throwable.getCause());
emitter.onError(new ExecutionException(throwable));
} else {
emitter.onError(throwable);
}
Expand Down
160 changes: 151 additions & 9 deletions src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.time.Duration;
Expand All @@ -19,8 +23,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class Rx3UtilTest {
Expand All @@ -29,21 +36,69 @@ class Rx3UtilTest {

RuntimeException textEx = new RuntimeException("test error");

static final AtomicReference<Throwable> caughtError = new AtomicReference<>();

@BeforeAll
static void beforeAll() {
RxJavaPlugins.setErrorHandler(caughtError::set);
}

@BeforeEach
void setup() {
caughtError.set(null);
}

//////////////
// toSingle()

@Test
void shouldConvertCompletionStageToSingle() {
var future = CompletableFuture.completedStage("test");
var result = Rx3Util.toSingle(future).blockingGet();
assertEquals("test", result);
}

@Test
void confirmCatchSingleFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.failedFuture(textEx);
var single = Single.fromFuture(future);
var ex = assertThrows(RuntimeException.class, single::blockingGet);
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToSingle() {
var future = CompletableFuture.failedStage(textEx);
var future = CompletableFuture.failedFuture(textEx);
var single = Rx3Util.toSingle(future);
var ex = assertThrows(RuntimeException.class, single::blockingGet);
assertEquals("test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelSingle() {
var future = delayedErrorFuture();
var single = Rx3Util.toSingle(future);
var disposable = single.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// toMaybe()

@Test
void shouldConvertCompletionStageToMaybe() {
var future = CompletableFuture.completedStage("test");
Expand All @@ -58,14 +113,46 @@ void shouldConvertCompletionStageNullResultToMaybe() {
assertEquals("empty", result);
}

@Test
void confirmCatchMaybeFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.failedFuture(textEx);
var maybe = Maybe.fromFuture(future);
var ex = assertThrows(RuntimeException.class, maybe::blockingGet);
ex.printStackTrace();
assertEquals("test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToMaybe() {
var future = CompletableFuture.failedStage(textEx);
var future = CompletableFuture.failedFuture(textEx);
var maybe = Rx3Util.toMaybe(future);
var ex = assertThrows(RuntimeException.class, maybe::blockingGet);
assertEquals("test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelMaybe() {
var future = delayedErrorFuture();
var maybe = Rx3Util.toMaybe(future);
var disposable = maybe.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// toCompletable()

@Test
void shouldConvertCompletionStageToCompletable() {
var future = CompletableFuture.completedStage((Void) null);
Expand All @@ -74,16 +161,52 @@ void shouldConvertCompletionStageToCompletable() {
assertTrue(complete.get());
}

@Test
void confirmCatchCompletableFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.runAsync(() -> {
throw textEx;
});
var completable = Completable.fromFuture(future);
var ex = assertThrows(RuntimeException.class, completable::blockingAwait);
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToCompletable() {
var future = CompletableFuture.runAsync(() -> {
throw textEx;
});
var completable = Rx3Util.toCompletable(future);
var ex = assertThrows(RuntimeException.class, completable::blockingAwait);
assertEquals("test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error",
ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelCompletable() {
var future = delayedErrorFuture();
var completable = Rx3Util.toCompletable(future);
var disposable = completable.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// wrapTransformerErrors()

@Test
void shouldWrapObservableTransformerErrors() {
var observable = Observable.just("result")
Expand Down Expand Up @@ -180,6 +303,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() {
assertEquals("result", result);
}

//////////////
// Others

@Test
@SneakyThrows
void shouldZipAll() {
Expand Down Expand Up @@ -241,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() {
var result = Flowable.fromIterable(ints)
.compose(Rx3Util.checkOrder(Integer::compareTo))
.toList();
var e = assertThrows(RuntimeException.class, () -> result.blockingGet());
assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage());
var ex = assertThrows(RuntimeException.class, () -> result.blockingGet());
ex.printStackTrace();
assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage());
}

@Test
Expand Down Expand Up @@ -281,12 +408,27 @@ void shouldRetryWithDelayPredicate() {
.compose(Rx3Util.retryWithDelayFlowable(
100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2")))
.blockingFirst();
var e = assertThrows(RuntimeException.class, func::get);
assertEquals("test error 2", e.getMessage());
var ex = assertThrows(RuntimeException.class, func::get);
ex.printStackTrace();
assertEquals("test error 2", ex.getMessage());
assertEquals(3, count.get());
assertEquals(3, times.size());
for (int j = 0; j < 2; j++) {
assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100);
}
}

//////////////
// Utils

private static CompletableFuture<Void> delayedErrorFuture() {
return CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("delayed error");
});
}
}