1616package rx .javafx .sources ;
1717
1818import javafx .collections .FXCollections ;
19- import javafx .collections .ObservableList ;
19+ import javafx .collections .ObservableSet ;
2020import rx .Observable ;
2121import rx .observables .JavaFxObservable ;
22+ import java .util .HashSet ;
23+ import java .util .Arrays ;
2224
2325/**
2426 * A CompositeObservable can merge multiple Observables that can be added/removed at any time,
2931 */
3032public final class CompositeObservable <T > {
3133
32- private final ObservableList <Observable <T >> sources ;
34+ private final ObservableSet <Observable <T >> sources ;
3335 private final int initialCapacity ;
3436
3537 public CompositeObservable () {
@@ -38,13 +40,13 @@ public CompositeObservable() {
3840
3941 public CompositeObservable (int initialCapacity ) {
4042 this .initialCapacity = initialCapacity ;
41- sources = FXCollections .synchronizedObservableList (FXCollections .observableArrayList ( ));
43+ sources = FXCollections .synchronizedObservableSet (FXCollections .observableSet ( new HashSet <>() ));
4244 }
4345
4446 public Observable <T > toObservable () {
4547 Observable <T > updatingSource = Observable .merge (
4648 Observable .from (sources ).flatMap (obs -> obs .takeWhile (v -> sources .contains (obs ))),
47- JavaFxObservable .fromObservableListAdds (sources ).flatMap (obs -> obs .takeWhile (v -> sources .contains (obs )))
49+ JavaFxObservable .fromObservableSetAdds (sources ).flatMap (obs -> obs .takeWhile (v -> sources .contains (obs )))
4850 );
4951
5052 if (initialCapacity > 0 ) {
@@ -57,7 +59,17 @@ public Observable<T> toObservable() {
5759 public void add (Observable <T > observable ) {
5860 sources .add (observable );
5961 }
62+ public void addAll (Observable <T >... observables ) {
63+ Arrays .stream (observables ).forEach (this ::add );
64+ }
6065 public void remove (Observable <T > observable ) {
6166 sources .remove (observable );
6267 }
68+ public void removeAll (Observable <T >... observables ) {
69+ Arrays .stream (observables ).forEach (this ::remove );
70+ }
71+ public void clear () {
72+ sources .clear ();
73+ }
74+
6375}
0 commit comments