diff --git a/.gitignore b/.gitignore index c1ce86b..8ca135b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ .idea/ build/ out/ +.profileconfig.json diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 206b7bc..d1ca60f 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +48,7 @@ public static Single toSingle(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -79,7 +80,7 @@ public static Maybe toMaybe(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -110,7 +111,7 @@ public static Completable toCompletable(CompletionStage future) { if (throwable != null) { // Emit only the cause message if (throwable.getCause() != null) { - emitter.onError(throwable.getCause()); + emitter.onError(new ExecutionException(throwable)); } else { emitter.onError(throwable); } diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 29e0f85..853d069 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -5,10 +5,14 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.FlowableTransformer; +import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableTransformer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.TestSubscriber; import java.time.Duration; @@ -19,8 +23,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class Rx3UtilTest { @@ -29,6 +36,21 @@ class Rx3UtilTest { RuntimeException textEx = new RuntimeException("test error"); + static final AtomicReference caughtError = new AtomicReference<>(); + + @BeforeAll + static void beforeAll() { + RxJavaPlugins.setErrorHandler(caughtError::set); + } + + @BeforeEach + void setup() { + caughtError.set(null); + } + + ////////////// + // toSingle() + @Test void shouldConvertCompletionStageToSingle() { var future = CompletableFuture.completedStage("test"); @@ -36,14 +58,47 @@ void shouldConvertCompletionStageToSingle() { assertEquals("test", result); } + @Test + void confirmCatchSingleFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var single = Single.fromFuture(future); + var ex = assertThrows(RuntimeException.class, single::blockingGet); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToSingle() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelSingle() { + var future = delayedErrorFuture(); + var single = Rx3Util.toSingle(future); + var disposable = single.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toMaybe() + @Test void shouldConvertCompletionStageToMaybe() { var future = CompletableFuture.completedStage("test"); @@ -58,14 +113,46 @@ void shouldConvertCompletionStageNullResultToMaybe() { assertEquals("empty", result); } + @Test + void confirmCatchMaybeFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var maybe = Maybe.fromFuture(future); + var ex = assertThrows(RuntimeException.class, maybe::blockingGet); + ex.printStackTrace(); + assertEquals("test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToMaybe() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelMaybe() { + var future = delayedErrorFuture(); + var maybe = Rx3Util.toMaybe(future); + var disposable = maybe.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toCompletable() + @Test void shouldConvertCompletionStageToCompletable() { var future = CompletableFuture.completedStage((Void) null); @@ -74,6 +161,21 @@ void shouldConvertCompletionStageToCompletable() { assertTrue(complete.get()); } + @Test + void confirmCatchCompletableFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.runAsync(() -> { + throw textEx; + }); + var completable = Completable.fromFuture(future); + var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToCompletable() { var future = CompletableFuture.runAsync(() -> { @@ -81,9 +183,30 @@ void shouldCatchCompletionStageErrorsToCompletable() { }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error", + ex.getMessage()); + + assertNull(caughtError.get()); + } + + @Test + @SneakyThrows + void shouldCancelCompletable() { + var future = delayedErrorFuture(); + var completable = Rx3Util.toCompletable(future); + var disposable = completable.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); } + ////////////// + // wrapTransformerErrors() + @Test void shouldWrapObservableTransformerErrors() { var observable = Observable.just("result") @@ -180,6 +303,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() { assertEquals("result", result); } + ////////////// + // Others + @Test @SneakyThrows void shouldZipAll() { @@ -241,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() { var result = Flowable.fromIterable(ints) .compose(Rx3Util.checkOrder(Integer::compareTo)) .toList(); - var e = assertThrows(RuntimeException.class, () -> result.blockingGet()); - assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage()); + var ex = assertThrows(RuntimeException.class, () -> result.blockingGet()); + ex.printStackTrace(); + assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage()); } @Test @@ -281,12 +408,27 @@ void shouldRetryWithDelayPredicate() { .compose(Rx3Util.retryWithDelayFlowable( 100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2"))) .blockingFirst(); - var e = assertThrows(RuntimeException.class, func::get); - assertEquals("test error 2", e.getMessage()); + var ex = assertThrows(RuntimeException.class, func::get); + ex.printStackTrace(); + assertEquals("test error 2", ex.getMessage()); assertEquals(3, count.get()); assertEquals(3, times.size()); for (int j = 0; j < 2; j++) { assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100); } } + + ////////////// + // Utils + + private static CompletableFuture delayedErrorFuture() { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("delayed error"); + }); + } }