Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 57 additions & 28 deletions src/main/java/com/autonomouslogic/commons/rxjava3/Rx3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
import java.time.Duration;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -42,14 +44,23 @@ private Rx3Util() {}
* @param <T> the return parameter of the future
*/
public static <T> Single<T> toSingle(CompletionStage<T> future) {
return Single.create(subscriber -> {
future.thenAccept(result -> {
subscriber.onSuccess(result);
})
.exceptionally(e -> {
subscriber.onError(e);
return null;
});
return Single.create(emitter -> {
future.whenComplete((result, throwable) -> {
if (!emitter.isDisposed()) {
if (throwable != null) {
emitter.onError(new ExecutionException(throwable));
} else if (result != null) {
emitter.onSuccess(result);
} else {
emitter.onError(new NullPointerException("CompletionStage completed with a null result"));
}
}
});
emitter.setCancellable(() -> {
if (future instanceof CompletableFuture<?>) {
((CompletableFuture<?>) future).cancel(false);
}
});
});
}

Expand All @@ -65,18 +76,23 @@ public static <T> Single<T> toSingle(CompletionStage<T> future) {
* @param <T> the return parameter of the future
*/
public static <T> Maybe<T> toMaybe(CompletionStage<T> future) {
return Maybe.create(subscriber -> {
future.thenAccept(result -> {
if (result == null) {
subscriber.onComplete();
} else {
subscriber.onSuccess(result);
}
})
.exceptionally(e -> {
subscriber.onError(e);
return null;
});
return Maybe.create(emitter -> {
future.whenComplete((result, throwable) -> {
if (!emitter.isDisposed()) {
if (throwable != null) {
emitter.onError(new ExecutionException(throwable));
} else if (result != null) {
emitter.onSuccess(result);
} else {
emitter.onComplete();
}
}
});
emitter.setCancellable(() -> {
if (future instanceof CompletableFuture<?>) {
((CompletableFuture<?>) future).cancel(false);
}
});
});
}

Expand All @@ -89,14 +105,27 @@ public static <T> Maybe<T> toMaybe(CompletionStage<T> future) {
* @return the Completable
*/
public static Completable toCompletable(CompletionStage<Void> future) {
return Completable.create(subscriber -> {
future.thenAccept(ignore -> {
subscriber.onComplete();
})
.exceptionally(e -> {
subscriber.onError(e);
return null;
});
return Completable.create(emitter -> {
future.whenComplete((result, throwable) -> {
if (!emitter.isDisposed()) {
if (throwable != null) {
// Emit only the cause message
if (throwable.getCause() != null) {
emitter.onError(new ExecutionException(throwable));
} else {
emitter.onError(throwable);
}
} else {
emitter.onComplete();
}
}
});

emitter.setCancellable(() -> {
if (future instanceof CompletableFuture<?>) {
((CompletableFuture<?>) future).cancel(false);
}
});
});
}

Expand Down
162 changes: 152 additions & 10 deletions src/test/java/com/autonomouslogic/commons/rxjava3/Rx3UtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.time.Duration;
Expand All @@ -19,8 +23,11 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class Rx3UtilTest {
Expand All @@ -29,21 +36,69 @@ class Rx3UtilTest {

RuntimeException textEx = new RuntimeException("test error");

static final AtomicReference<Throwable> caughtError = new AtomicReference<>();

@BeforeAll
static void beforeAll() {
RxJavaPlugins.setErrorHandler(caughtError::set);
}

@BeforeEach
void setup() {
caughtError.set(null);
}

//////////////
// toSingle()

@Test
void shouldConvertCompletionStageToSingle() {
var future = CompletableFuture.completedStage("test");
var result = Rx3Util.toSingle(future).blockingGet();
assertEquals("test", result);
}

@Test
void confirmCatchSingleFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.failedFuture(textEx);
var single = Single.fromFuture(future);
var ex = assertThrows(RuntimeException.class, single::blockingGet);
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToSingle() {
var future = CompletableFuture.failedStage(textEx);
var future = CompletableFuture.failedFuture(textEx);
var single = Rx3Util.toSingle(future);
var ex = assertThrows(RuntimeException.class, single::blockingGet);
assertEquals("java.lang.RuntimeException: test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelSingle() {
var future = delayedErrorFuture();
var single = Rx3Util.toSingle(future);
var disposable = single.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// toMaybe()

@Test
void shouldConvertCompletionStageToMaybe() {
var future = CompletableFuture.completedStage("test");
Expand All @@ -58,14 +113,46 @@ void shouldConvertCompletionStageNullResultToMaybe() {
assertEquals("empty", result);
}

@Test
void confirmCatchMaybeFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.failedFuture(textEx);
var maybe = Maybe.fromFuture(future);
var ex = assertThrows(RuntimeException.class, maybe::blockingGet);
ex.printStackTrace();
assertEquals("test error", ex.getMessage());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why RxJava's Maybe.fromFuture doesn't wrap it like the others do. I opted to wrap it below to have consistency.


assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToMaybe() {
var future = CompletableFuture.failedStage(textEx);
var future = CompletableFuture.failedFuture(textEx);
var maybe = Rx3Util.toMaybe(future);
var ex = assertThrows(RuntimeException.class, maybe::blockingGet);
assertEquals("java.lang.RuntimeException: test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelMaybe() {
var future = delayedErrorFuture();
var maybe = Rx3Util.toMaybe(future);
var disposable = maybe.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// toCompletable()

@Test
void shouldConvertCompletionStageToCompletable() {
var future = CompletableFuture.completedStage((Void) null);
Expand All @@ -74,16 +161,52 @@ void shouldConvertCompletionStageToCompletable() {
assertTrue(complete.get());
}

@Test
void confirmCatchCompletableFromFuture() {
// This test confirms how RxJava's fromFuture works, so it can be replicated below
var future = CompletableFuture.runAsync(() -> {
throw textEx;
});
var completable = Completable.fromFuture(future);
var ex = assertThrows(RuntimeException.class, completable::blockingAwait);
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.lang.RuntimeException: test error", ex.getMessage());

assertNull(caughtError.get());
}

@Test
void shouldCatchCompletionStageErrorsToCompletable() {
var future = CompletableFuture.supplyAsync((Supplier<Void>) () -> {
var future = CompletableFuture.runAsync(() -> {
throw textEx;
});
var completable = Rx3Util.toCompletable(future);
var ex = assertThrows(RuntimeException.class, completable::blockingAwait);
assertEquals("java.lang.RuntimeException: test error", ex.getMessage());
ex.printStackTrace();
assertEquals(
"java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.lang.RuntimeException: test error",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't match exactly, but it's good enough

ex.getMessage());

assertNull(caughtError.get());
}

@Test
@SneakyThrows
void shouldCancelCompletable() {
var future = delayedErrorFuture();
var completable = Rx3Util.toCompletable(future);
var disposable = completable.subscribe();
disposable.dispose();
assertTrue(disposable.isDisposed());
assertTrue(future.isCancelled());
Thread.sleep(200);
assertNull(caughtError.get());
}

//////////////
// wrapTransformerErrors()

@Test
void shouldWrapObservableTransformerErrors() {
var observable = Observable.just("result")
Expand Down Expand Up @@ -180,6 +303,9 @@ void shouldNotBlockResultsWhenWrappingFlowableTransformer() {
assertEquals("result", result);
}

//////////////
// Others

@Test
@SneakyThrows
void shouldZipAll() {
Expand Down Expand Up @@ -241,8 +367,9 @@ void shouldErrorCheckOrderOnUnorderedStuff() {
var result = Flowable.fromIterable(ints)
.compose(Rx3Util.checkOrder(Integer::compareTo))
.toList();
var e = assertThrows(RuntimeException.class, () -> result.blockingGet());
assertEquals("Stream isn't ordered - last: 5, current: 4", e.getMessage());
var ex = assertThrows(RuntimeException.class, () -> result.blockingGet());
ex.printStackTrace();
assertEquals("Stream isn't ordered - last: 5, current: 4", ex.getMessage());
}

@Test
Expand Down Expand Up @@ -281,12 +408,27 @@ void shouldRetryWithDelayPredicate() {
.compose(Rx3Util.retryWithDelayFlowable(
100, Duration.ofSeconds(1), e -> !e.getMessage().equals("test error 2")))
.blockingFirst();
var e = assertThrows(RuntimeException.class, func::get);
assertEquals("test error 2", e.getMessage());
var ex = assertThrows(RuntimeException.class, func::get);
ex.printStackTrace();
assertEquals("test error 2", ex.getMessage());
assertEquals(3, count.get());
assertEquals(3, times.size());
for (int j = 0; j < 2; j++) {
assertEquals(1000, Duration.between(times.get(j), times.get(j + 1)).toMillis(), 100);
}
}

//////////////
// Utils

private static CompletableFuture<Void> delayedErrorFuture() {
return CompletableFuture.runAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
throw new RuntimeException("delayed error");
});
}
}