Skip to content

Commit 156a744

Browse files
committed
implement JavaFxSubscriber.toLazyBinding()
1 parent 0b24214 commit 156a744

File tree

3 files changed

+98
-7
lines changed

3 files changed

+98
-7
lines changed

src/main/java/rx/observers/BindingSubscriber.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package rx.observers;
1818

1919
import com.sun.javafx.binding.ExpressionHelper;
20+
import io.reactivex.flowables.ConnectableFlowable;
2021
import io.reactivex.functions.Consumer;
2122
import javafx.beans.InvalidationListener;
2223
import javafx.beans.binding.Binding;
@@ -30,11 +31,18 @@
3031
final class BindingSubscriber<T> implements Subscriber<T>, ObservableValue<T>, Binding<T> {
3132

3233
private final Consumer<Throwable> onError;
34+
private final ConnectableFlowable<T> flowable;
35+
private boolean connected = false;
3336
private Subscription subscription;
3437
private ExpressionHelper<T> helper;
3538
private T value;
3639

3740
BindingSubscriber(Consumer<Throwable> onError) {
41+
this.flowable = null;
42+
this.onError = onError;
43+
}
44+
BindingSubscriber(ConnectableFlowable<T> flowable, Consumer<Throwable> onError) {
45+
this.flowable = flowable;
3846
this.onError = onError;
3947
}
4048

@@ -65,6 +73,10 @@ public void onNext(T t) {
6573
}
6674
@Override
6775
public T getValue() {
76+
if (!connected && flowable != null) {
77+
flowable.connect();
78+
connected = true;
79+
}
6880
return value;
6981
}
7082
@Override

src/main/java/rx/observers/JavaFxSubscriber.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import io.reactivex.Flowable;
2020
import io.reactivex.Observable;
21+
import io.reactivex.flowables.ConnectableFlowable;
2122
import io.reactivex.functions.Consumer;
23+
import io.reactivex.observables.ConnectableObservable;
2224
import javafx.beans.binding.Binding;
2325
public enum JavaFxSubscriber {
2426
;//no instances
@@ -27,16 +29,35 @@ public enum JavaFxSubscriber {
2729
* Turns a Flowable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
2830
*/
2931
public static <T> Binding<T> toBinding(Flowable<T> obs) {
30-
BindingSubscriber<T> bindingObserver = new BindingSubscriber<>(e -> {});
31-
obs.subscribe(bindingObserver);
32-
return bindingObserver;
32+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(e -> {});
33+
obs.subscribe(bindingSubscriber);
34+
return bindingSubscriber;
3335
}
3436
/**
3537
* Turns a Flowable into an eager JavaFX Binding that subscribes immediately to the Observable. Calling the Binding's dispose() method will handle the unsubscription.
3638
*/
3739
public static <T> Binding<T> toBinding(Flowable<T> obs, Consumer<Throwable> onErrorAction ) {
38-
BindingSubscriber<T> bindingObserver = new BindingSubscriber<>(onErrorAction);
39-
obs.subscribe(bindingObserver);
40-
return bindingObserver;
40+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(onErrorAction);
41+
obs.subscribe(bindingSubscriber);
42+
return bindingSubscriber;
43+
}
44+
45+
/**
46+
* Turns a Flowable into an lazy JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
47+
*/
48+
public static <T> Binding<T> toLazyBinding(Flowable<T> flowable) {
49+
ConnectableFlowable<T> published = flowable.publish();
50+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(published, e -> {});
51+
published.subscribe(bindingSubscriber);
52+
return bindingSubscriber;
53+
}
54+
/**
55+
* Turns a Flowable into an eager JavaFX Binding that subscribes to the Observable when its getValue() is called. Calling the Binding's dispose() method will handle the unsubscription.
56+
*/
57+
public static <T> Binding<T> toLazyBinding(Flowable<T> flowable, Consumer<Throwable> onErrorAction ) {
58+
ConnectableFlowable<T> published = flowable.publish();
59+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(published,onErrorAction);
60+
published.subscribe(bindingSubscriber);
61+
return bindingSubscriber;
4162
}
4263
}

src/test/java/rx/subscriptions/BindingTest.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package rx.subscriptions;
22

3+
import io.reactivex.Flowable;
34
import io.reactivex.Observable;
45
import javafx.application.Platform;
56
import javafx.beans.binding.Binding;
@@ -86,7 +87,64 @@ public void testObserverLazyBinding() {
8687

8788
sleep(1000);
8889

89-
System.out.println(emissionCount.get());
90+
assertTrue(emissionCount.get() == 0);
91+
92+
binding.getValue();
93+
94+
sleep(1000);
95+
96+
assertTrue(emissionCount.get() == 4);
97+
assertTrue(binding.getValue().equals("Delta"));
98+
latch.countDown();
99+
});
100+
101+
try {
102+
latch.await();
103+
} catch (InterruptedException e) {
104+
e.printStackTrace();
105+
}
106+
}
107+
108+
@Test
109+
public void testSubscriberBinding() {
110+
final CountDownLatch latch = new CountDownLatch(1);
111+
Platform.runLater(() -> {
112+
113+
final AtomicInteger emissionCount = new AtomicInteger(0);
114+
115+
Flowable<String> items =
116+
Flowable.just("Alpha", "Beta", "Gamma", "Delta")
117+
.doOnNext(s -> emissionCount.incrementAndGet());
118+
119+
Binding<String> binding = JavaFxSubscriber.toBinding(items);
120+
121+
assertTrue(emissionCount.get() == 4);
122+
assertTrue(binding.getValue().equals("Delta"));
123+
latch.countDown();
124+
});
125+
126+
try {
127+
latch.await();
128+
} catch (InterruptedException e) {
129+
e.printStackTrace();
130+
}
131+
}
132+
133+
@Test
134+
public void testSubscriberLazyBinding() {
135+
final CountDownLatch latch = new CountDownLatch(1);
136+
Platform.runLater(() -> {
137+
138+
final AtomicInteger emissionCount = new AtomicInteger(0);
139+
140+
Flowable<String> items =
141+
Flowable.just("Alpha", "Beta", "Gamma", "Delta")
142+
.doOnNext(s -> emissionCount.incrementAndGet());
143+
144+
Binding<String> binding = JavaFxSubscriber.toLazyBinding(items);
145+
146+
sleep(1000);
147+
90148
assertTrue(emissionCount.get() == 0);
91149

92150
binding.getValue();

0 commit comments

Comments
 (0)