From 9d53dda2e0f1882b93384619e270c108aa4de9e5 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Wed, 12 Mar 2025 09:43:53 +0900 Subject: [PATCH 1/4] Added verification tests to confirm how RxJava handles exceptions --- .gitignore | 1 + .../commons/rxjava3/Rx3UtilTest.java | 60 ++++++++++++++++--- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index c1ce86b..8ca135b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ .idea/ build/ out/ +.profileconfig.json diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 29e0f85..be97156 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -5,10 +5,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import io.reactivex.rxjava3.core.Flowable; -import io.reactivex.rxjava3.core.FlowableTransformer; -import io.reactivex.rxjava3.core.Observable; -import io.reactivex.rxjava3.core.ObservableTransformer; +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.TestSubscriber; import java.time.Duration; @@ -19,8 +17,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class Rx3UtilTest { @@ -29,6 +30,18 @@ class Rx3UtilTest { RuntimeException textEx = new RuntimeException("test error"); + static final AtomicReference caughtError = new AtomicReference<>(); + + @BeforeAll + static void beforeAll() { + RxJavaPlugins.setErrorHandler(caughtError::set); + } + + @BeforeEach + void setup() { + caughtError.set(null); + } + @Test void shouldConvertCompletionStageToSingle() { var future = CompletableFuture.completedStage("test"); @@ -36,12 +49,23 @@ void shouldConvertCompletionStageToSingle() { assertEquals("test", result); } + @Test + void confirmCatchSingleFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var single = Single.fromFuture(future); + var ex = assertThrows(RuntimeException.class, single::blockingGet); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + } + @Test void shouldCatchCompletionStageErrorsToSingle() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("test error", ex.getMessage()); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); } @Test @@ -58,9 +82,18 @@ void shouldConvertCompletionStageNullResultToMaybe() { assertEquals("empty", result); } + @Test + void confirmCatchMaybeFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var maybe = Maybe.fromFuture(future); + var ex = assertThrows(RuntimeException.class, maybe::blockingGet); + assertEquals("test error", ex.getMessage()); + } + @Test void shouldCatchCompletionStageErrorsToMaybe() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); assertEquals("test error", ex.getMessage()); @@ -74,6 +107,16 @@ void shouldConvertCompletionStageToCompletable() { assertTrue(complete.get()); } + @Test + void confirmCatchCompletableFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var completable = Completable.fromFuture(future); + var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + } + @Test void shouldCatchCompletionStageErrorsToCompletable() { var future = CompletableFuture.runAsync(() -> { @@ -81,7 +124,8 @@ void shouldCatchCompletionStageErrorsToCompletable() { }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); - assertEquals("test error", ex.getMessage()); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); } @Test From dca491cee2d6d0d6bbd4d4c3eb889a6278553350 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Wed, 12 Mar 2025 10:28:58 +0900 Subject: [PATCH 2/4] More testing --- .../commons/rxjava3/Rx3UtilTest.java | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index be97156..2849e96 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -42,6 +42,9 @@ void setup() { caughtError.set(null); } + ////////////// + // toSingle() + @Test void shouldConvertCompletionStageToSingle() { var future = CompletableFuture.completedStage("test"); @@ -57,6 +60,8 @@ void confirmCatchSingleFromFuture() { var ex = assertThrows(RuntimeException.class, single::blockingGet); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } @Test @@ -66,8 +71,26 @@ void shouldCatchCompletionStageErrorsToSingle() { var ex = assertThrows(RuntimeException.class, single::blockingGet); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelSingle() { + var future = delayedErrorFuture(); + var single = Rx3Util.toSingle(future); + var disposable = single.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toMaybe() + @Test void shouldConvertCompletionStageToMaybe() { var future = CompletableFuture.completedStage("test"); @@ -89,6 +112,8 @@ void confirmCatchMaybeFromFuture() { var maybe = Maybe.fromFuture(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); assertEquals("test error", ex.getMessage()); + + assertNull(caughtError.get()); } @Test @@ -97,8 +122,26 @@ void shouldCatchCompletionStageErrorsToMaybe() { var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); assertEquals("test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + + @Test + @SneakyThrows + void shouldCancelMaybe() { + var future = delayedErrorFuture(); + var maybe = Rx3Util.toMaybe(future); + var disposable = maybe.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); } + ////////////// + // toCompletable() + @Test void shouldConvertCompletionStageToCompletable() { var future = CompletableFuture.completedStage((Void) null); @@ -115,6 +158,8 @@ void confirmCatchCompletableFromFuture() { var ex = assertThrows(RuntimeException.class, completable::blockingAwait); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } @Test @@ -126,8 +171,26 @@ void shouldCatchCompletionStageErrorsToCompletable() { var ex = assertThrows(RuntimeException.class, completable::blockingAwait); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelCompletable() { + var future = delayedErrorFuture(); + var completable = Rx3Util.toCompletable(future); + var disposable = completable.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // wrapTransformerErrors() + @Test void shouldWrapObservableTransformerErrors() { var observable = Observable.just("result") @@ -224,6 +287,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() { assertEquals("result", result); } + ////////////// + // Others + @Test @SneakyThrows void shouldZipAll() { @@ -333,4 +399,18 @@ void shouldRetryWithDelayPredicate() { assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100); } } + + ////////////// + // Utils + + private static CompletableFuture delayedErrorFuture() { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("delayed error"); + }); + } } From 3b026f1e7e6296c60366c736452c163ab9122785 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Wed, 12 Mar 2025 11:20:56 +0900 Subject: [PATCH 3/4] Imports --- .../com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 2849e96..fe63cc8 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -5,7 +5,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableTransformer; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.ObservableTransformer; +import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.TestSubscriber; From 8c9e9b79c8510d6e5c25a01e9b68d09c636a0256 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Wed, 12 Mar 2025 12:27:24 +0900 Subject: [PATCH 4/4] Standard exception wrapping --- .../commons/rxjava3/Rx3Util.java | 7 ++--- .../commons/rxjava3/Rx3UtilTest.java | 26 ++++++++++++++----- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 206b7bc..d1ca60f 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +48,7 @@ public static Single toSingle(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -79,7 +80,7 @@ public static Maybe toMaybe(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -110,7 +111,7 @@ public static Completable toCompletable(CompletionStage future) { if (throwable != null) { // Emit only the cause message if (throwable.getCause() != null) { - emitter.onError(throwable.getCause()); + emitter.onError(new ExecutionException(throwable)); } else { emitter.onError(throwable); } diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index fe63cc8..853d069 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -64,6 +64,7 @@ void confirmCatchSingleFromFuture() { var future = CompletableFuture.failedFuture(textEx); var single = Single.fromFuture(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); + ex.printStackTrace(); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); @@ -75,6 +76,7 @@ void shouldCatchCompletionStageErrorsToSingle() { var future = CompletableFuture.failedFuture(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); + ex.printStackTrace(); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); @@ -117,6 +119,7 @@ void confirmCatchMaybeFromFuture() { var future = CompletableFuture.failedFuture(textEx); var maybe = Maybe.fromFuture(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); + ex.printStackTrace(); assertEquals("test error", ex.getMessage()); assertNull(caughtError.get()); @@ -127,7 +130,9 @@ void shouldCatchCompletionStageErrorsToMaybe() { var future = CompletableFuture.failedFuture(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); assertNull(caughtError.get()); } @@ -159,9 +164,12 @@ void shouldConvertCompletionStageToCompletable() { @Test void confirmCatchCompletableFromFuture() { // This test confirms how RxJava's fromFuture works, so it can be replicated below - var future = CompletableFuture.failedFuture(textEx); + var future = CompletableFuture.runAsync(() -> { + throw textEx; + }); var completable = Completable.fromFuture(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + ex.printStackTrace(); assertEquals( "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); @@ -175,8 +183,10 @@ void shouldCatchCompletionStageErrorsToCompletable() { }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + ex.printStackTrace(); assertEquals( - "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error", + ex.getMessage()); assertNull(caughtError.get()); } @@ -357,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() { var result = Flowable.fromIterable(ints) .compose(Rx3Util.checkOrder(Integer::compareTo)) .toList(); - var e = assertThrows(RuntimeException.class, () -> result.blockingGet()); - assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage()); + var ex = assertThrows(RuntimeException.class, () -> result.blockingGet()); + ex.printStackTrace(); + assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage()); } @Test @@ -397,8 +408,9 @@ void shouldRetryWithDelayPredicate() { .compose(Rx3Util.retryWithDelayFlowable( 100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2"))) .blockingFirst(); - var e = assertThrows(RuntimeException.class, func::get); - assertEquals("test error 2", e.getMessage()); + var ex = assertThrows(RuntimeException.class, func::get); + ex.printStackTrace(); + assertEquals("test error 2", ex.getMessage()); assertEquals(3, count.get()); assertEquals(3, times.size()); for (int j = 0; j < 2; j++) {