Skip to content

Commit fb28c7b

Browse files
committed
Implement CompositeObservable
1 parent 28fdb3e commit fb28c7b

File tree

1 file changed

+62
-0
lines changed

1 file changed

+62
-0
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.javafx.sources;
17+
18+
import javafx.collections.FXCollections;
19+
import javafx.collections.ObservableList;
20+
import rx.Observable;
21+
import rx.observables.JavaFxObservable;
22+
23+
/**
24+
* A CompositeObservable can merge multiple Observables that can be added/removed at any time,
25+
* affecting all Subscribers regardless of when they subscribed. This is especailly helpful for merging
26+
* multiple UI event sources.
27+
* @param <T>
28+
*/
29+
public final class CompositeObservable<T> {
30+
31+
private final ObservableList<Observable<T>> sources;
32+
private final Observable<T> observable;
33+
34+
public CompositeObservable() {
35+
this(-1);
36+
}
37+
38+
public CompositeObservable(int initialCapacity) {
39+
sources = FXCollections.synchronizedObservableList(FXCollections.observableArrayList());
40+
41+
Observable<T> observable = JavaFxObservable.fromObservableList(sources)
42+
.switchMap(list -> Observable.from(list).flatMap((Observable<T> obs) -> obs));
43+
44+
if (initialCapacity > 0) {
45+
this.observable = observable.cacheWithInitialCapacity(initialCapacity);
46+
}
47+
else {
48+
this.observable = observable;
49+
}
50+
}
51+
52+
public Observable<T> toObservable() {
53+
return observable;
54+
}
55+
56+
public void add(Observable<T> observable) {
57+
sources.add(observable);
58+
}
59+
public void remove(Observable<T> observable) {
60+
sources.remove(observable);
61+
}
62+
}

0 commit comments

Comments
 (0)