diff --git a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java index 7517ff0..d1ca60f 100644 --- a/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java +++ b/src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java @@ -17,7 +17,9 @@ import java.time.Duration; import java.util.Comparator; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -42,14 +44,23 @@ private Rx3Util() {} * @param the return parameter of the future */ public static Single toSingle(CompletionStage future) { - return Single.create(subscriber -> { - future.thenAccept(result -> { - subscriber.onSuccess(result); - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); + return Single.create(emitter -> { + future.whenComplete((result, throwable) -> { + if (!emitter.isDisposed()) { + if (throwable != null) { + emitter.onError(new ExecutionException(throwable)); + } else if (result != null) { + emitter.onSuccess(result); + } else { + emitter.onError(new NullPointerException("CompletionStage completed with a null result")); + } + } + }); + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + ((CompletableFuture) future).cancel(false); + } + }); }); } @@ -65,18 +76,23 @@ public static Single toSingle(CompletionStage future) { * @param the return parameter of the future */ public static Maybe toMaybe(CompletionStage future) { - return Maybe.create(subscriber -> { - future.thenAccept(result -> { - if (result == null) { - subscriber.onComplete(); - } else { - subscriber.onSuccess(result); - } - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); + return Maybe.create(emitter -> { + future.whenComplete((result, throwable) -> { + if (!emitter.isDisposed()) { + if (throwable != null) { + emitter.onError(new ExecutionException(throwable)); + } else if (result != null) { + emitter.onSuccess(result); + } else { + emitter.onComplete(); + } + } + }); + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + ((CompletableFuture) future).cancel(false); + } + }); }); } @@ -89,14 +105,27 @@ public static Maybe toMaybe(CompletionStage future) { * @return the Completable */ public static Completable toCompletable(CompletionStage future) { - return Completable.create(subscriber -> { - future.thenAccept(ignore -> { - subscriber.onComplete(); - }) - .exceptionally(e -> { - subscriber.onError(e); - return null; - }); + return Completable.create(emitter -> { + future.whenComplete((result, throwable) -> { + if (!emitter.isDisposed()) { + if (throwable != null) { + // Emit only the cause message + if (throwable.getCause() != null) { + emitter.onError(new ExecutionException(throwable)); + } else { + emitter.onError(throwable); + } + } else { + emitter.onComplete(); + } + } + }); + + emitter.setCancellable(() -> { + if (future instanceof CompletableFuture) { + ((CompletableFuture) future).cancel(false); + } + }); }); } diff --git a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java index c2d2e12..853d069 100644 --- a/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java +++ b/src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java @@ -5,10 +5,14 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.reactivex.rxjava3.core.Completable; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.FlowableTransformer; +import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableTransformer; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.TestSubscriber; import java.time.Duration; @@ -19,8 +23,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; class Rx3UtilTest { @@ -29,6 +36,21 @@ class Rx3UtilTest { RuntimeException textEx = new RuntimeException("test error"); + static final AtomicReference caughtError = new AtomicReference<>(); + + @BeforeAll + static void beforeAll() { + RxJavaPlugins.setErrorHandler(caughtError::set); + } + + @BeforeEach + void setup() { + caughtError.set(null); + } + + ////////////// + // toSingle() + @Test void shouldConvertCompletionStageToSingle() { var future = CompletableFuture.completedStage("test"); @@ -36,14 +58,47 @@ void shouldConvertCompletionStageToSingle() { assertEquals("test", result); } + @Test + void confirmCatchSingleFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var single = Single.fromFuture(future); + var ex = assertThrows(RuntimeException.class, single::blockingGet); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToSingle() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var single = Rx3Util.toSingle(future); var ex = assertThrows(RuntimeException.class, single::blockingGet); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelSingle() { + var future = delayedErrorFuture(); + var single = Rx3Util.toSingle(future); + var disposable = single.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toMaybe() + @Test void shouldConvertCompletionStageToMaybe() { var future = CompletableFuture.completedStage("test"); @@ -58,14 +113,46 @@ void shouldConvertCompletionStageNullResultToMaybe() { assertEquals("empty", result); } + @Test + void confirmCatchMaybeFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.failedFuture(textEx); + var maybe = Maybe.fromFuture(future); + var ex = assertThrows(RuntimeException.class, maybe::blockingGet); + ex.printStackTrace(); + assertEquals("test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToMaybe() { - var future = CompletableFuture.failedStage(textEx); + var future = CompletableFuture.failedFuture(textEx); var maybe = Rx3Util.toMaybe(future); var ex = assertThrows(RuntimeException.class, maybe::blockingGet); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelMaybe() { + var future = delayedErrorFuture(); + var maybe = Rx3Util.toMaybe(future); + var disposable = maybe.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // toCompletable() + @Test void shouldConvertCompletionStageToCompletable() { var future = CompletableFuture.completedStage((Void) null); @@ -74,16 +161,52 @@ void shouldConvertCompletionStageToCompletable() { assertTrue(complete.get()); } + @Test + void confirmCatchCompletableFromFuture() { + // This test confirms how RxJava's fromFuture works, so it can be replicated below + var future = CompletableFuture.runAsync(() -> { + throw textEx; + }); + var completable = Completable.fromFuture(future); + var ex = assertThrows(RuntimeException.class, completable::blockingAwait); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage()); + + assertNull(caughtError.get()); + } + @Test void shouldCatchCompletionStageErrorsToCompletable() { - var future = CompletableFuture.supplyAsync((Supplier) () -> { + var future = CompletableFuture.runAsync(() -> { throw textEx; }); var completable = Rx3Util.toCompletable(future); var ex = assertThrows(RuntimeException.class, completable::blockingAwait); - assertEquals("java.lang.RuntimeException: test error", ex.getMessage()); + ex.printStackTrace(); + assertEquals( + "java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error", + ex.getMessage()); + + assertNull(caughtError.get()); } + @Test + @SneakyThrows + void shouldCancelCompletable() { + var future = delayedErrorFuture(); + var completable = Rx3Util.toCompletable(future); + var disposable = completable.subscribe(); + disposable.dispose(); + assertTrue(disposable.isDisposed()); + assertTrue(future.isCancelled()); + Thread.sleep(200); + assertNull(caughtError.get()); + } + + ////////////// + // wrapTransformerErrors() + @Test void shouldWrapObservableTransformerErrors() { var observable = Observable.just("result") @@ -180,6 +303,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() { assertEquals("result", result); } + ////////////// + // Others + @Test @SneakyThrows void shouldZipAll() { @@ -241,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() { var result = Flowable.fromIterable(ints) .compose(Rx3Util.checkOrder(Integer::compareTo)) .toList(); - var e = assertThrows(RuntimeException.class, () -> result.blockingGet()); - assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage()); + var ex = assertThrows(RuntimeException.class, () -> result.blockingGet()); + ex.printStackTrace(); + assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage()); } @Test @@ -281,12 +408,27 @@ void shouldRetryWithDelayPredicate() { .compose(Rx3Util.retryWithDelayFlowable( 100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2"))) .blockingFirst(); - var e = assertThrows(RuntimeException.class, func::get); - assertEquals("test error 2", e.getMessage()); + var ex = assertThrows(RuntimeException.class, func::get); + ex.printStackTrace(); + assertEquals("test error 2", ex.getMessage()); assertEquals(3, count.get()); assertEquals(3, times.size()); for (int j = 0; j < 2; j++) { assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100); } } + + ////////////// + // Utils + + private static CompletableFuture delayedErrorFuture() { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new RuntimeException("delayed error"); + }); + } }