From 663aaba0356592c012db690b409d63ac1bd028a4 Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 13:42:29 -0500 Subject: [PATCH 1/9] Refactor Rx3Util to Use Single.defer and Maybe.defer for Non-Blocking Operations --- .../commons/rxjava3/Rx3Util.java | 31 +++---------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 7517ff0..2adfbc3 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -35,22 +35,12 @@ private Rx3Util() {} * Null return values will result in an error from RxJava, as those aren't allowed. * Use {@link #toMaybe(CompletionStage)} instead to handle null values properly. * - * {@link Single#fromFuture(Future)} works in a blocking fashion, whereas {@link CompletionStage} can be utilised to avoid blocking calls. - * * @param future the completion stage * @return the Single * @param the return parameter of the future */ public static Single toSingle(CompletionStage future) { - return Single.create(subscriber -> { - future.thenAccept(result -> { - subscriber.onSuccess(result); - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); - }); + return Single.defer(() -> Single.fromFuture(future.toCompletableFuture())); } /** @@ -58,26 +48,15 @@ public static Single toSingle(CompletionStage future) { * * Null return values will result in an empty Maybe. * - * {@link Maybe#fromFuture(Future)} works in a blocking fashion, whereas {@link CompletionStage} can be utilised to avoid blocking calls. - * * @param future the completion stage * @return the Maybe * @param the return parameter of the future */ public static Maybe toMaybe(CompletionStage future) { - return Maybe.create(subscriber -> { - future.thenAccept(result -> { - if (result == null) { - subscriber.onComplete(); - } else { - subscriber.onSuccess(result); - } - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); - }); + return Maybe.defer( + () -> Maybe.fromFuture(future.toCompletableFuture()) + .switchIfEmpty(Maybe.empty()) // Ensure empty Maybe for null values + ); } /** From 15fabe888e06d59e0aaaad9ef979a5267f1f92ac Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 14:08:07 -0500 Subject: [PATCH 2/9] fix: update tests --- .../java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index c2d2e12..884f389 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -41,7 +41,7 @@ void shouldCatchCompletionStageErrorsToSingle() { var future = CompletableFuture.failedStage(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + assertEquals("java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); } @Test @@ -63,7 +63,7 @@ void shouldCatchCompletionStageErrorsToMaybe() { var future = CompletableFuture.failedStage(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + assertEquals("test error", ex.getMessage()); } @Test From 72e02cf26f9ef050a4d95f26a31267118c5e3f1c Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:14:32 -0500 Subject: [PATCH 3/9] fix --- .../commons/rxjava3/Rx3Util.java | 62 +++++++++++++++++-- .../commons/rxjava3/Rx3UtilTest.java | 2 +- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 2adfbc3..767f3cc 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -17,6 +17,7 @@ import java.time.Duration; import java.util.Comparator; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -40,7 +41,34 @@ private Rx3Util() {} * @param the return parameter of the future */ public static Single toSingle(CompletionStage future) { - return Single.defer(() -> Single.fromFuture(future.toCompletableFuture())); + return Single.create(emitter -> { + // Efficiently handle completion + future.whenComplete((result, throwable) -> { + if (throwable != null) { + // Emit an error if a throwable is present + if (!emitter.isDisposed()) { + emitter.onError(throwable); + } + } else if (result != null) { + // Emit the result if not null + if (!emitter.isDisposed()) { + emitter.onSuccess(result); + } + } else { + // Single does not support null results; emit a NullPointerException + if (!emitter.isDisposed()) { + emitter.onError(new NullPointerException("CompletionStage completed with a null result")); + } + } + }); + // Handle cancellation efficiently + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + CompletableFuture completableFuture = (CompletableFuture) future; + completableFuture.cancel(false); // Avoid interruption unless necessary + } + }); + }); } /** @@ -53,10 +81,34 @@ public static Single toSingle(CompletionStage future) { * @param the return parameter of the future */ public static Maybe toMaybe(CompletionStage future) { - return Maybe.defer( - () -> Maybe.fromFuture(future.toCompletableFuture()) - .switchIfEmpty(Maybe.empty()) // Ensure empty Maybe for null values - ); + return Maybe.create(emitter -> { + // Efficiently handle completion + future.whenComplete((result, throwable) -> { + if (throwable != null) { + // Emit an error if a throwable is present + if (!emitter.isDisposed()) { + emitter.onError(throwable); + } + } else if (result != null) { + // Emit the result if not null + if (!emitter.isDisposed()) { + emitter.onSuccess(result); + } + } else { + // Complete the Maybe if result is null + if (!emitter.isDisposed()) { + emitter.onComplete(); + } + } + }); + // Handle cancellation efficiently + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + CompletableFuture completableFuture = (CompletableFuture) future; + completableFuture.cancel(false); // Avoid interruption unless necessary + } + }); + }); } /** diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 884f389..c3705fa 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -41,7 +41,7 @@ void shouldCatchCompletionStageErrorsToSingle() { var future = CompletableFuture.failedStage(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + assertEquals("test error", ex.getMessage()); } @Test From c37eacc08e6992af512b8d37ffc805a4951e0fde Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:30:04 -0500 Subject: [PATCH 4/9] fix comment --- .../java/com/autonomouslogic/commons/rxjava3/Rx3Util.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 767f3cc..4afaf53 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -36,6 +36,8 @@ private Rx3Util() {} * Null return values will result in an error from RxJava, as those aren't allowed. * Use {@link #toMaybe(CompletionStage)} instead to handle null values properly. * + * {@link Single#fromFuture(Future)} works in a blocking fashion, whereas {@link CompletionStage} can be utilised to avoid blocking calls. + * * @param future the completion stage * @return the Single * @param the return parameter of the future @@ -76,6 +78,8 @@ public static Single toSingle(CompletionStage future) { * * Null return values will result in an empty Maybe. * + * {@link Maybe#fromFuture(Future)} works in a blocking fashion, whereas {@link CompletionStage} can be utilised to avoid blocking calls. + * * @param future the completion stage * @return the Maybe * @param the return parameter of the future From 0358f838789290e0eda57e30fb52c422061264ba Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:30:33 -0500 Subject: [PATCH 5/9] spacing --- src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 4afaf53..3d41948 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -79,7 +79,7 @@ public static Single toSingle(CompletionStage future) { * Null return values will result in an empty Maybe. * * {@link Maybe#fromFuture(Future)} works in a blocking fashion, whereas {@link CompletionStage} can be utilised to avoid blocking calls. - * + * * @param future the completion stage * @return the Maybe * @param the return parameter of the future From b482cf6ae7ea79ae29b499b8bc4d745b7eeedd6c Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:40:34 -0500 Subject: [PATCH 6/9] fix --- .../commons/rxjava3/Rx3Util.java | 40 +++++-------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 3d41948..f778639 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -44,30 +44,20 @@ private Rx3Util() {} */ public static Single toSingle(CompletionStage future) { return Single.create(emitter -> { - // Efficiently handle completion future.whenComplete((result, throwable) -> { - if (throwable != null) { - // Emit an error if a throwable is present - if (!emitter.isDisposed()) { + if (!emitter.isDisposed()) { + if (throwable != null) { emitter.onError(throwable); - } - } else if (result != null) { - // Emit the result if not null - if (!emitter.isDisposed()) { + } else if (result != null) { emitter.onSuccess(result); - } - } else { - // Single does not support null results; emit a NullPointerException - if (!emitter.isDisposed()) { + } else { emitter.onError(new NullPointerException("CompletionStage completed with a null result")); } } }); - // Handle cancellation efficiently emitter.setCancellable(() -> { if (future instanceof CompletableFuture) { - CompletableFuture completableFuture = (CompletableFuture) future; - completableFuture.cancel(false); // Avoid interruption unless necessary + ((CompletableFuture) future).cancel(false); } }); }); @@ -86,30 +76,20 @@ public static Single toSingle(CompletionStage future) { */ public static Maybe toMaybe(CompletionStage future) { return Maybe.create(emitter -> { - // Efficiently handle completion future.whenComplete((result, throwable) -> { - if (throwable != null) { - // Emit an error if a throwable is present - if (!emitter.isDisposed()) { + if (!emitter.isDisposed()) { + if (throwable != null) { emitter.onError(throwable); - } - } else if (result != null) { - // Emit the result if not null - if (!emitter.isDisposed()) { + } else if (result != null) { emitter.onSuccess(result); - } - } else { - // Complete the Maybe if result is null - if (!emitter.isDisposed()) { + } else { emitter.onComplete(); } } }); - // Handle cancellation efficiently emitter.setCancellable(() -> { if (future instanceof CompletableFuture) { - CompletableFuture completableFuture = (CompletableFuture) future; - completableFuture.cancel(false); // Avoid interruption unless necessary + ((CompletableFuture) future).cancel(false); } }); }); From 0918168671b809edd360f80d3bd614d1c1ae192f Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:54:15 -0500 Subject: [PATCH 7/9] fix --- .../commons/rxjava3/Rx3Util.java | 29 ++++++++++++++----- .../commons/rxjava3/Rx3UtilTest.java | 6 ++-- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index f778639..206b7bc 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -104,14 +104,27 @@ public static Maybe toMaybe(CompletionStage future) { * @return the Completable */ public static Completable toCompletable(CompletionStage future) { - return Completable.create(subscriber -> { - future.thenAccept(ignore -> { - subscriber.onComplete(); - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); + return Completable.create(emitter -> { + future.whenComplete((result, throwable) -> { + if (!emitter.isDisposed()) { + if (throwable != null) { + // Emit only the cause message + if (throwable.getCause() != null) { + emitter.onError(throwable.getCause()); + } else { + emitter.onError(throwable); + } + } else { + emitter.onComplete(); + } + } + }); + + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + ((CompletableFuture) future).cancel(false); + } + }); }); } diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index c3705fa..86677dd 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -76,12 +76,12 @@ void shouldConvertCompletionStageToCompletable() { @Test void shouldCatchCompletionStageErrorsToCompletable() { - var future = CompletableFuture.supplyAsync((Supplier) () -> { - throw textEx; + var future = CompletableFuture.runAsync(() -> { + throw new RuntimeException("test error"); }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + assertEquals("test error", ex.getMessage()); } @Test From 80b216d52047597b9fa91f845ca45f7a03ab4fd3 Mon Sep 17 00:00:00 2001 From: James Guyer <108893641+jamesoguyer@users.noreply.github.com> Date: Mon, 20 Jan 2025 21:55:21 -0500 Subject: [PATCH 8/9] fix --- .../java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 86677dd..29e0f85 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -77,7 +77,7 @@ void shouldConvertCompletionStageToCompletable() { @Test void shouldCatchCompletionStageErrorsToCompletable() { var future = CompletableFuture.runAsync(() -> { - throw new RuntimeException("test error"); + throw textEx; }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); From 40b48dbef862374f9a1abcc82f949aff057ede23 Mon Sep 17 00:00:00 2001 From: Kenneth Jorgensen Date: Wed, 12 Mar 2025 12:33:19 +0900 Subject: [PATCH 9/9] Applied changes from #1 --- .../commons/rxjava3/Rx3Util.java | 7 +- .../commons/rxjava3/Rx3UtilTest.java | 160 +++++++++++++++++- 2 files changed, 155 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 206b7bc..d1ca60f 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,7 +48,7 @@ public static Single toSingle(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -79,7 +80,7 @@ public static Maybe toMaybe(CompletionStage future) { future.whenComplete((result, throwable) -> { if (!emitter.isDisposed()) { if (throwable != null) { - emitter.onError(throwable); + emitter.onError(new ExecutionException(throwable)); } else if (result != null) { emitter.onSuccess(result); } else { @@ -110,7 +111,7 @@ public static Completable toCompletable(CompletionStage future) { if (throwable != null) { // Emit only the cause message if (throwable.getCause() != null) { - emitter.onError(throwable.getCause()); + emitter.onError(new ExecutionException(throwable)); } else { emitter.onError(throwable); } diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index 29e0f85..853d069 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -5,10 +5,14 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.FlowableTransformer; +import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableTransformer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.TestSubscriber; import java.time.Duration; @@ -19,8 +23,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class Rx3UtilTest { @@ -29,6 +36,21 @@ class Rx3UtilTest { RuntimeException textEx = new RuntimeException("test error"); + static final AtomicReference caughtError = new AtomicReference<>(); + + @BeforeAll + static void beforeAll() { + RxJavaPlugins.setErrorHandler(caughtError::set); + } + + @BeforeEach + void setup() { + caughtError.set(null); + } + + ////////////// + // toSingle() + @Test void shouldConvertCompletionStageToSingle() { var future = CompletableFuture.completedStage("test"); @@ -36,14 +58,47 @@ void shouldConvertCompletionStageToSingle() { assertEquals("test", result); } + @Test + void confirmCatchSingleFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var single = Single.fromFuture(future); + var ex = assertThrows(RuntimeException.class, single::blockingGet); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToSingle() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelSingle() { + var future = delayedErrorFuture(); + var single = Rx3Util.toSingle(future); + var disposable = single.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toMaybe() + @Test void shouldConvertCompletionStageToMaybe() { var future = CompletableFuture.completedStage("test"); @@ -58,14 +113,46 @@ void shouldConvertCompletionStageNullResultToMaybe() { assertEquals("empty", result); } + @Test + void confirmCatchMaybeFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var maybe = Maybe.fromFuture(future); + var ex = assertThrows(RuntimeException.class, maybe::blockingGet); + ex.printStackTrace(); + assertEquals("test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToMaybe() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelMaybe() { + var future = delayedErrorFuture(); + var maybe = Rx3Util.toMaybe(future); + var disposable = maybe.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toCompletable() + @Test void shouldConvertCompletionStageToCompletable() { var future = CompletableFuture.completedStage((Void) null); @@ -74,6 +161,21 @@ void shouldConvertCompletionStageToCompletable() { assertTrue(complete.get()); } + @Test + void confirmCatchCompletableFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.runAsync(() -> { + throw textEx; + }); + var completable = Completable.fromFuture(future); + var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToCompletable() { var future = CompletableFuture.runAsync(() -> { @@ -81,9 +183,30 @@ void shouldCatchCompletionStageErrorsToCompletable() { }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); - assertEquals("test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error", + ex.getMessage()); + + assertNull(caughtError.get()); + } + + @Test + @SneakyThrows + void shouldCancelCompletable() { + var future = delayedErrorFuture(); + var completable = Rx3Util.toCompletable(future); + var disposable = completable.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); } + ////////////// + // wrapTransformerErrors() + @Test void shouldWrapObservableTransformerErrors() { var observable = Observable.just("result") @@ -180,6 +303,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() { assertEquals("result", result); } + ////////////// + // Others + @Test @SneakyThrows void shouldZipAll() { @@ -241,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() { var result = Flowable.fromIterable(ints) .compose(Rx3Util.checkOrder(Integer::compareTo)) .toList(); - var e = assertThrows(RuntimeException.class, () -> result.blockingGet()); - assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage()); + var ex = assertThrows(RuntimeException.class, () -> result.blockingGet()); + ex.printStackTrace(); + assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage()); } @Test @@ -281,12 +408,27 @@ void shouldRetryWithDelayPredicate() { .compose(Rx3Util.retryWithDelayFlowable( 100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2"))) .blockingFirst(); - var e = assertThrows(RuntimeException.class, func::get); - assertEquals("test error 2", e.getMessage()); + var ex = assertThrows(RuntimeException.class, func::get); + ex.printStackTrace(); + assertEquals("test error 2", ex.getMessage()); assertEquals(3, count.get()); assertEquals(3, times.size()); for (int j = 0; j < 2; j++) { assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100); } } + + ////////////// + // Utils + + private static CompletableFuture delayedErrorFuture() { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("delayed error"); + }); + } }