Skip to content

Commit ffd3e74

Browse files
committed
Implement ObservableSet source
1 parent 84c947d commit ffd3e74

File tree

3 files changed

+133
-3
lines changed

3 files changed

+133
-3
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package rx.javafx.sources;
2+
3+
import javafx.collections.ObservableSet;
4+
import javafx.collections.SetChangeListener;
5+
import rx.Observable;
6+
import rx.schedulers.JavaFxScheduler;
7+
import rx.subscriptions.JavaFxSubscriptions;
8+
9+
public final class ObservableSetSource {
10+
private ObservableSetSource() {}
11+
12+
public static <T> Observable<ObservableSet<T>> fromObservableSet(final ObservableSet<T> source) {
13+
14+
return Observable.create((Observable.OnSubscribe<ObservableSet<T>>) subscriber -> {
15+
SetChangeListener<T> listener = c -> subscriber.onNext(source);
16+
source.addListener(listener);
17+
subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(() -> source.removeListener(listener)));
18+
}).subscribeOn(JavaFxScheduler.getInstance());
19+
}
20+
21+
public static <T> Observable<T> fromObservableSetAdds(final ObservableSet<T> source) {
22+
23+
return Observable.create((Observable.OnSubscribe<T>) subscriber -> {
24+
25+
SetChangeListener<T> listener = c -> {
26+
if (c.wasAdded()) {
27+
subscriber.onNext(c.getElementAdded());
28+
}
29+
};
30+
source.addListener(listener);
31+
subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(() -> source.removeListener(listener)));
32+
33+
}).subscribeOn(JavaFxScheduler.getInstance());
34+
}
35+
36+
public static <T> Observable<T> fromObservableSetRemovals(final ObservableSet<T> source) {
37+
38+
return Observable.create((Observable.OnSubscribe<T>) subscriber -> {
39+
40+
SetChangeListener<T> listener = c -> {
41+
if (c.wasRemoved()) {
42+
subscriber.onNext(c.getElementRemoved());
43+
}
44+
};
45+
source.addListener(listener);
46+
subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(() -> source.removeListener(listener)));
47+
48+
}).subscribeOn(JavaFxScheduler.getInstance());
49+
}
50+
51+
public static <T> Observable<SetChange<T>> fromObservableSetChanges(final ObservableSet<T> source) {
52+
53+
return Observable.create((Observable.OnSubscribe<SetChange<T>>) subscriber -> {
54+
55+
SetChangeListener<T> listener = c -> {
56+
if (c.wasRemoved()) {
57+
subscriber.onNext(new SetChange<>(c.getElementRemoved(), Flag.REMOVED));
58+
}
59+
if (c.wasAdded()) {
60+
subscriber.onNext(new SetChange<>(c.getElementAdded(), Flag.ADDED));
61+
}
62+
};
63+
source.addListener(listener);
64+
subscriber.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(() -> source.removeListener(listener)));
65+
66+
}).subscribeOn(JavaFxScheduler.getInstance());
67+
}
68+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package rx.javafx.sources;
2+
3+
public final class SetChange<T> {
4+
private final T value;
5+
private final Flag flag;
6+
7+
SetChange(T value, Flag flag) {
8+
this.value = value;
9+
this.flag = flag;
10+
}
11+
public T getValue() {
12+
return value;
13+
}
14+
public Flag getFlag() {
15+
return flag;
16+
}
17+
@Override
18+
public String toString() {
19+
return flag + " " + value;
20+
}
21+
}

src/main/java/rx/observables/JavaFxObservable.java

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import javafx.beans.value.ObservableValue;
2020
import javafx.collections.ObservableList;
2121
import javafx.collections.ObservableMap;
22+
import javafx.collections.ObservableSet;
2223
import javafx.event.ActionEvent;
2324
import javafx.event.Event;
2425
import javafx.event.EventType;
@@ -226,18 +227,58 @@ public static <K,T> Observable<Map.Entry<K,T>> fromObservableMapAdds(final Obser
226227
* Creates an observable that emits all removal items from an ObservableMap
227228
*
228229
* @param source The target ObservableMap for the item removal events
229-
* @return An Observable emitting items removed Entry items from the ObservableList
230+
* @return An Observable emitting items removed Entry items from the ObservableMap
230231
*/
231232
public static <K,T> Observable<Map.Entry<K,T>> fromObservableMapRemovals(final ObservableMap<K,T> source) {
232233
return ObservableMapSource.fromObservableMapRemovals(source);
233234
}
234235

235236
/**
236-
* Emits all added, removed, and updated items from an ObservableMap
237+
* Emits all added and removed items (including swaps) from an ObservableMap
237238
* @param source
238-
* @return An Observable emitting changed entries with an ADDED, REMOVED, or UPDATED flag
239+
* @return An Observable emitting changed entries with an ADDED or REMOVED flag
239240
*/
240241
public static <K,T> Observable<MapChange<K,T>> fromObservableMapChanges(final ObservableMap<K,T> source) {
241242
return ObservableMapSource.fromObservableMapChanges(source);
242243
}
244+
245+
/**
246+
* Creates an observable that emits an ObservableSet every time it is modified
247+
*
248+
* @param source The target ObservableSet of the SetChange events
249+
* @return An Observable emitting the ObservableSet each time it changes
250+
*/
251+
public static <T> Observable<ObservableSet<T>> fromObservableSet(final ObservableSet<T> source) {
252+
return ObservableSetSource.fromObservableSet(source);
253+
}
254+
255+
/**
256+
* Creates an observable that emits all additions to an ObservableSet
257+
*
258+
* @param source The target ObservableSet for the item add events
259+
* @return An Observable emitting items added to the ObservableSet
260+
*/
261+
public static <T> Observable<T> fromObservableSetAdds(final ObservableSet<T> source) {
262+
return ObservableSetSource.fromObservableSetAdds(source);
263+
}
264+
265+
/**
266+
* Creates an observable that emits all removal items from an ObservableSet
267+
*
268+
* @param source The target ObservableSet for the item removal events
269+
* @return An Observable emitting items removed items from the ObservableSet
270+
*/
271+
public static <T> Observable<T> fromObservableSetRemovals(final ObservableSet<T> source) {
272+
return ObservableSetSource.fromObservableSetRemovals(source);
273+
}
274+
275+
276+
/**
277+
* Emits all added and removed items (including swaps) from an ObservableSet
278+
* @param source
279+
* @return An Observable emitting changed entries with an ADDED or REMOVED flag
280+
*/
281+
public static <T> Observable<SetChange<T>> fromObservableSetChanges(final ObservableSet<T> source) {
282+
return ObservableSetSource.fromObservableSetChanges(source);
283+
}
243284
}

0 commit comments

Comments
 (0)