Skip to content

Commit e0129b3

Browse files
committed
implement JavaFxTransformers
1 parent 3c3801b commit e0129b3

File tree

1 file changed

+201
-0
lines changed

1 file changed

+201
-0
lines changed
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package rx.transformers;
2+
3+
import javafx.application.Platform;
4+
import rx.Observable;
5+
import rx.Producer;
6+
import rx.Subscriber;
7+
import rx.exceptions.Exceptions;
8+
import rx.functions.Action0;
9+
import rx.functions.Action1;
10+
11+
public final class JavaFxTransformers {
12+
private JavaFxTransformers() {}
13+
14+
/**
15+
* Performs a given action for each item on the FX thread
16+
* @param onNext
17+
* @param <T>
18+
*/
19+
public static <T> Observable.Transformer<T,T> doOnNextFx(Action1<T> onNext) {
20+
return obs -> obs.doOnNext(t -> Platform.runLater(() -> onNext.call(t)));
21+
}
22+
23+
/**
24+
* Performs a given action on a Throwable on the FX thread in the event of an onError
25+
* @param onError
26+
* @param <T>
27+
*/
28+
public static <T> Observable.Transformer<T,T> doOnErrorFx(Action1<Throwable> onError) {
29+
return obs -> obs.doOnError(e -> Platform.runLater(() -> onError.call(e)));
30+
}
31+
32+
/**
33+
* Performs a given Action0 on the FX thread when onCompleted is called
34+
* @param onCompleted
35+
* @param <T>
36+
*/
37+
public static <T> Observable.Transformer<T,T> doOnCompletedFx(Action0 onCompleted) {
38+
return obs -> obs.doOnCompleted(() -> Platform.runLater(onCompleted::call));
39+
}
40+
41+
/**
42+
* Performs a given Action0 on the FX thread when subscribed to
43+
* @param subscribe
44+
* @param <T>
45+
*/
46+
public static <T> Observable.Transformer<T,T> doOnSubscribeFx(Action0 subscribe) {
47+
return obs -> obs.doOnSubscribe((() -> Platform.runLater(subscribe::call)));
48+
}
49+
50+
/**
51+
* Performs the provided onTerminate action on the FX thread
52+
* @param onTerminate
53+
* @param <T>
54+
*/
55+
public static <T> Observable.Transformer<T,T> doOnTerminateFx(Action0 onTerminate) {
56+
return obs -> obs.doOnTerminate(() -> Platform.runLater(onTerminate::call));
57+
}
58+
59+
/**
60+
* Performs the provided onTerminate action on the FX thread
61+
* @param onUnsubscribe
62+
* @param <T>
63+
*/
64+
public static <T> Observable.Transformer<T,T> doOnUnsubscribeFx(Action0 onUnsubscribe) {
65+
return obs -> obs.doOnUnsubscribe(() -> Platform.runLater(onUnsubscribe::call));
66+
}
67+
68+
/**
69+
* Performs an action on onNext with the provided emission count
70+
* @param onNext
71+
* @param <T>
72+
*/
73+
public static <T> Observable.Transformer<T,T> doOnNextCount(Action1<Integer> onNext) {
74+
return obs -> obs.lift(new OperatorEmissionCounter<>(new CountObserver(onNext,null,null)));
75+
}
76+
77+
/**
78+
* Performs an action on onCompleted with the provided emission count
79+
* @param onCompleted
80+
* @param <T>
81+
*/
82+
public static <T> Observable.Transformer<T,T> doOnCompletedCount(Action1<Integer> onCompleted) {
83+
return obs -> obs.lift(new OperatorEmissionCounter<>(new CountObserver(null,onCompleted,null)));
84+
}
85+
86+
/**
87+
* Performs an action on onError with the provided emission count
88+
* @param onError
89+
* @param <T>
90+
*/
91+
public static <T> Observable.Transformer<T,T> doOnErrorCount(Action1<Integer> onError) {
92+
return obs -> obs.lift(new OperatorEmissionCounter<>(new CountObserver(null,null,onError)));
93+
}
94+
95+
/**
96+
* Performs an action on FX thread on onNext with the provided emission count
97+
* @param onNext
98+
* @param <T>
99+
*/
100+
public static <T> Observable.Transformer<T,T> doOnNextCountFx(Action1<Integer> onNext) {
101+
return obs -> obs.compose(doOnNextCount(i -> Platform.runLater(() -> onNext.call(i))));
102+
}
103+
104+
/**
105+
* Performs an action on FX thread on onCompleted with the provided emission count
106+
* @param onCompleted
107+
* @param <T>
108+
*/
109+
public static <T> Observable.Transformer<T,T> doOnCompletedCountFx(Action1<Integer> onCompleted) {
110+
return obs -> obs.compose(doOnCompletedCount(i -> Platform.runLater(() -> onCompleted.call(i))));
111+
}
112+
113+
/**
114+
* Performs an action on FX thread on onError with the provided emission count
115+
* @param onError
116+
* @param <T>
117+
*/
118+
public static <T> Observable.Transformer<T,T> doOnErrorCountFx(Action1<Integer> onError) {
119+
return obs -> obs.compose(doOnErrorCount(i -> Platform.runLater(() -> onError.call(i))));
120+
}
121+
122+
123+
private static class OperatorEmissionCounter<T> implements Observable.Operator<T,T> {
124+
125+
private final CountObserver ctObserver;
126+
127+
OperatorEmissionCounter(CountObserver ctObserver) {
128+
this.ctObserver = ctObserver;
129+
}
130+
131+
@Override
132+
public Subscriber<? super T> call(Subscriber<? super T> child) {
133+
134+
return new Subscriber<T>() {
135+
private int count = 0;
136+
private boolean done = false;
137+
138+
@Override
139+
public void onCompleted() {
140+
if (done)
141+
return;
142+
143+
try {
144+
if (ctObserver.doOnCompletedCountAction != null)
145+
ctObserver.doOnCompletedCountAction.call(count);
146+
} catch (Exception e) {
147+
Exceptions.throwIfFatal(e);
148+
onError(e);
149+
return;
150+
}
151+
done = true;
152+
child.onCompleted();
153+
}
154+
155+
@Override
156+
public void onError(Throwable e) {
157+
if (done)
158+
return;
159+
try {
160+
if (ctObserver.doOnErrorCountAction != null)
161+
ctObserver.doOnErrorCountAction.call(count);
162+
} catch(Exception e1) {
163+
Exceptions.throwIfFatal(e1);
164+
child.onError(e1);
165+
}
166+
}
167+
168+
@Override
169+
public void onNext(T t) {
170+
if (done)
171+
return;
172+
try {
173+
if (ctObserver.doOnNextCountAction != null)
174+
ctObserver.doOnNextCountAction.call(++count);
175+
} catch(Exception e) {
176+
Exceptions.throwIfFatal(e);
177+
onError(e);
178+
return;
179+
}
180+
child.onNext(t);
181+
}
182+
183+
@Override
184+
public void setProducer(Producer p) {
185+
child.setProducer(p);
186+
}
187+
};
188+
}
189+
}
190+
private static final class CountObserver {
191+
private final Action1<Integer> doOnNextCountAction;
192+
private final Action1<Integer> doOnCompletedCountAction;
193+
private final Action1<Integer> doOnErrorCountAction;
194+
195+
CountObserver(Action1<Integer> doOnNextCountAction, Action1<Integer> doOnCompletedCountAction, Action1<Integer> doOnErrorCountAction) {
196+
this.doOnNextCountAction = doOnNextCountAction;
197+
this.doOnCompletedCountAction = doOnCompletedCountAction;
198+
this.doOnErrorCountAction = doOnErrorCountAction;
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)