Skip to content

Commit 0f16268

Browse files
committed
Separate Flowable and Observable Fx operators
1 parent 4142b11 commit 0f16268

File tree

2 files changed

+233
-2
lines changed

2 files changed

+233
-2
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package rx.transformers;
2+
3+
import io.reactivex.FlowableOperator;
4+
import io.reactivex.FlowableTransformer;
5+
import io.reactivex.exceptions.Exceptions;
6+
import io.reactivex.functions.Action;
7+
import io.reactivex.functions.Consumer;
8+
import io.reactivex.subscribers.ResourceSubscriber;
9+
import javafx.application.Platform;
10+
import org.reactivestreams.Subscriber;
11+
import org.reactivestreams.Subscription;
12+
13+
14+
public class FxFlowableTransformers {
15+
16+
private FxFlowableTransformers() {}
17+
18+
private static <T> void runOnFx(T t, Consumer<T> consumer) {
19+
Platform.runLater(() -> {
20+
try {
21+
consumer.accept(t);
22+
} catch (Throwable e) {
23+
throw Exceptions.propagate(e);
24+
}
25+
}
26+
);
27+
}
28+
29+
private static <T> void runOnFx(Action action) {
30+
Platform.runLater(() -> {
31+
try {
32+
action.run();
33+
} catch (Throwable e) {
34+
throw Exceptions.propagate(e);
35+
}
36+
}
37+
);
38+
}
39+
40+
/**
41+
* Performs a given action for each item on the FX thread
42+
* @param onNext
43+
* @param <T>
44+
*/
45+
public static <T> FlowableTransformer<T,T> doOnNextFx(Consumer<T> onNext) {
46+
return obs -> obs.doOnNext(t -> runOnFx(t, onNext));
47+
}
48+
49+
/**
50+
* Performs a given action on a Throwable on the FX thread in the event of an onError
51+
* @param onError
52+
* @param <T>
53+
*/
54+
public static <T> FlowableTransformer<T,T> doOnErrorFx(Consumer<Throwable> onError) {
55+
return obs -> obs.doOnError(e -> runOnFx(e,onError));
56+
}
57+
58+
/**
59+
* Performs a given Action on the FX thread when onCompleted is called
60+
* @param onCompleted
61+
* @param <T>
62+
*/
63+
public static <T> FlowableTransformer<T,T> doOnCompleteFx(Action onCompleted) {
64+
return obs -> obs.doOnComplete(() -> runOnFx(onCompleted));
65+
}
66+
67+
/**
68+
* Performs a given Action on the FX thread when subscribed to
69+
* @param subscribe
70+
* @param <T>
71+
*/
72+
public static <T> FlowableTransformer<T,T> doOnSubscribeFx(Consumer<Subscription> subscribe) {
73+
return obs -> obs.doOnSubscribe((d -> runOnFx(d,subscribe)));
74+
}
75+
76+
/**
77+
* Performs the provided onTerminate action on the FX thread
78+
* @param onTerminate
79+
* @param <T>
80+
*/
81+
public static <T> FlowableTransformer<T,T> doOnTerminateFx(Action onTerminate) {
82+
return obs -> obs.doOnTerminate(() -> runOnFx(onTerminate));
83+
}
84+
85+
/**
86+
* Performs the provided onTerminate action on the FX thread
87+
* @param onDipsose
88+
* @param <T>
89+
*/
90+
public static <T> FlowableTransformer<T,T> doOnCancelFx(Action onDipsose) {
91+
return obs -> obs.doOnCancel(() -> runOnFx(onDipsose));
92+
}
93+
94+
/**
95+
* Performs an action on onNext with the provided emission count
96+
* @param onNext
97+
* @param <T>
98+
*/
99+
public static <T> FlowableTransformer<T,T> doOnNextCount(Consumer<Integer> onNext) {
100+
return obs -> obs.lift(new FlowableEmissionCounter<>(new CountObserver(onNext,null,null)));
101+
}
102+
103+
/**
104+
* Performs an action on onComplete with the provided emission count
105+
* @param onComplete
106+
* @param <T>
107+
*/
108+
public static <T> FlowableTransformer<T,T> doOnCompleteCount(Consumer<Integer> onComplete) {
109+
return obs -> obs.lift(new FlowableEmissionCounter<>(new CountObserver(null,onComplete,null)));
110+
}
111+
112+
/**
113+
* Performs an action on onError with the provided emission count
114+
* @param onError
115+
* @param <T>
116+
*/
117+
public static <T> FlowableTransformer<T,T> doOnErrorCount(Consumer<Integer> onError) {
118+
return obs -> obs.lift(new FlowableEmissionCounter<>(new CountObserver(null,null,onError)));
119+
}
120+
121+
/**
122+
* Performs an action on FX thread on onNext with the provided emission count
123+
* @param onNext
124+
* @param <T>
125+
*/
126+
public static <T> FlowableTransformer<T,T> doOnNextCountFx(Consumer<Integer> onNext) {
127+
return obs -> obs.compose(doOnNextCount(i -> runOnFx(i,onNext)));
128+
}
129+
130+
/**
131+
* Performs an action on FX thread on onCompleted with the provided emission count
132+
* @param onComplete
133+
* @param <T>
134+
*/
135+
public static <T> FlowableTransformer<T,T> doOnCompleteCountFx(Consumer<Integer> onComplete) {
136+
return obs -> obs.compose(doOnCompleteCount(i -> runOnFx(i,onComplete)));
137+
}
138+
139+
/**
140+
* Performs an action on FX thread on onError with the provided emission count
141+
* @param onError
142+
* @param <T>
143+
*/
144+
public static <T> FlowableTransformer<T,T> doOnErrorCountFx(Consumer<Integer> onError) {
145+
return obs -> obs.compose(doOnErrorCount(i -> runOnFx(i,onError)));
146+
}
147+
148+
149+
private static class FlowableEmissionCounter<T> implements FlowableOperator<T,T> {
150+
151+
private final CountObserver ctObserver;
152+
153+
FlowableEmissionCounter(CountObserver ctObserver) {
154+
this.ctObserver = ctObserver;
155+
}
156+
157+
@Override
158+
public Subscriber<? super T> apply(Subscriber<? super T> child) {
159+
160+
return new ResourceSubscriber<T>() {
161+
private int count = 0;
162+
private boolean done = false;
163+
164+
@Override
165+
protected void onStart() {
166+
super.onStart();
167+
request(Long.MAX_VALUE);
168+
}
169+
170+
@Override
171+
public void onComplete() {
172+
if (done)
173+
return;
174+
175+
try {
176+
if (ctObserver.doOnCompletedCountAction != null)
177+
ctObserver.doOnCompletedCountAction.accept(count);
178+
} catch (Exception e) {
179+
Exceptions.throwIfFatal(e);
180+
onError(e);
181+
return;
182+
}
183+
done = true;
184+
child.onComplete();
185+
}
186+
187+
@Override
188+
public void onError(Throwable e) {
189+
if (done)
190+
return;
191+
try {
192+
if (ctObserver.doOnErrorCountAction != null)
193+
ctObserver.doOnErrorCountAction.accept(count);
194+
} catch(Exception e1) {
195+
Exceptions.throwIfFatal(e1);
196+
child.onError(e1);
197+
}
198+
}
199+
200+
@Override
201+
public void onNext(T t) {
202+
if (done)
203+
return;
204+
try {
205+
if (ctObserver.doOnNextCountAction != null)
206+
ctObserver.doOnNextCountAction.accept(++count);
207+
} catch(Exception e) {
208+
Exceptions.throwIfFatal(e);
209+
onError(e);
210+
return;
211+
}
212+
child.onNext(t);
213+
request(Long.MAX_VALUE);
214+
}
215+
};
216+
}
217+
}
218+
219+
private static final class CountObserver {
220+
private final Consumer<Integer> doOnNextCountAction;
221+
private final Consumer<Integer> doOnCompletedCountAction;
222+
private final Consumer<Integer> doOnErrorCountAction;
223+
224+
CountObserver(Consumer<Integer> doOnNextCountAction, Consumer<Integer> doOnCompletedCountAction, Consumer<Integer> doOnErrorCountAction) {
225+
this.doOnNextCountAction = doOnNextCountAction;
226+
this.doOnCompletedCountAction = doOnCompletedCountAction;
227+
this.doOnErrorCountAction = doOnErrorCountAction;
228+
}
229+
}
230+
231+
}

src/main/java/rx/transformers/JavaFxTransformers.java renamed to src/main/java/rx/transformers/FxObservableTransformers.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
import javafx.application.Platform;
1212

1313

14-
public final class JavaFxTransformers {
15-
private JavaFxTransformers() {}
14+
public final class FxObservableTransformers {
15+
private FxObservableTransformers() {}
1616

1717
private static <T> void runOnFx(T t, Consumer<T> consumer) {
1818
Platform.runLater(() -> {

0 commit comments

Comments
 (0)