44import javafx .beans .value .ObservableValue ;
55import rx .Observable ;
66import rx .Subscriber ;
7- import rx .functions .Action0 ;
87import rx .subscriptions .JavaFxSubscriptions ;
98
109public class ObservableValueSource {
@@ -18,25 +17,31 @@ public static <T> Observable<T> fromObservableValue(final ObservableValue<T> fxO
1817 public void call (final Subscriber <? super T > subscriber ) {
1918 subscriber .onNext (fxObservable .getValue ());
2019
21- final ChangeListener <T > listener = new ChangeListener <T >() {
22- @ Override
23- public void changed (final ObservableValue <? extends T > observableValue , final T prev , final T current ) {
24- subscriber .onNext (current );
25- }
26- };
20+ final ChangeListener <T > listener = (observableValue , prev , current ) -> subscriber .onNext (current );
2721
2822 fxObservable .addListener (listener );
2923
30- subscriber .add (JavaFxSubscriptions .unsubscribeInEventDispatchThread (new Action0 () {
31- @ Override
32- public void call () {
33- fxObservable .removeListener (listener );
34- }
35- }));
24+ subscriber .add (JavaFxSubscriptions .unsubscribeInEventDispatchThread (() -> fxObservable .removeListener (listener )));
3625
3726 }
3827 });
3928 }
29+ /**
30+ * @see rx.observables.JavaFxObservable#fromObservableValue
31+ */
32+ public static <T > Observable <Change <T >> fromObservableValueChanges (final ObservableValue <T > fxObservable ) {
33+ return Observable .create (new Observable .OnSubscribe <Change <T >>() {
34+ @ Override
35+ public void call (final Subscriber <? super Change <T >> subscriber ) {
36+
37+ final ChangeListener <T > listener = (observableValue , prev , current ) -> subscriber .onNext (new Change <>(prev ,current ));
4038
39+ fxObservable .addListener (listener );
40+
41+ subscriber .add (JavaFxSubscriptions .unsubscribeInEventDispatchThread (() -> fxObservable .removeListener (listener )));
42+
43+ }
44+ });
45+ }
4146
4247}
0 commit comments