Skip to content

Commit 6cb1874

Browse files
committed
Overhaul CompositeObservable with Transformer argument
1 parent 7c85d89 commit 6cb1874

File tree

3 files changed

+33
-55
lines changed

3 files changed

+33
-55
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.0.5
1+
version=0.2.0

src/main/java/rx/javafx/sources/CompositeObservable.java

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,47 +16,65 @@
1616
package rx.javafx.sources;
1717

1818
import javafx.collections.FXCollections;
19-
import javafx.collections.ObservableList;
2019
import javafx.collections.ObservableSet;
2120
import rx.Observable;
21+
import rx.annotations.Beta;
2222
import rx.observables.JavaFxObservable;
23-
import java.util.HashSet;
23+
2424
import java.util.Arrays;
25+
import java.util.HashSet;
26+
2527

2628
/**
2729
* A CompositeObservable can merge multiple Observables that can be added/removed at any time,
2830
* affecting all Subscribers regardless of when they subscribed. This is especially helpful for merging
29-
* multiple UI event sources.
31+
* multiple UI event sources. You can also pass a Transformer to perform
32+
* further operations on the combined Observable that is returned
3033
*
3134
* @param <T>
3235
*/
36+
@Beta
3337
public final class CompositeObservable<T> {
3438

3539
private final ObservableSet<Observable<T>> sources;
36-
private final int initialCapacity;
40+
private final Observable<T> output;
3741

42+
/**
43+
* Creates a new CompositeObservable
44+
*/
3845
public CompositeObservable() {
39-
this(-1);
46+
this(null);
4047
}
4148

42-
public CompositeObservable(int initialCapacity) {
43-
this.initialCapacity = initialCapacity;
49+
/**
50+
* Creates a new CompositeObservable with the provided transformations applied to the returned Observable
51+
* yield from `toObservable()`. For instance, you can pass `obs -> obs.replay(1).refCount()` to make this CompositeObservable
52+
* replay one emission or `obs -> obs.share()` to multicast it.
53+
* @param transformer
54+
*/
55+
public CompositeObservable(Observable.Transformer<T,T> transformer) {
4456
sources = FXCollections.synchronizedObservableSet(FXCollections.observableSet(new HashSet<>()));
45-
}
4657

47-
public Observable<T> toObservable() {
4858
Observable<T> updatingSource = Observable.merge(
4959
Observable.from(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs))),
5060
JavaFxObservable.fromObservableSetAdds(sources).flatMap(obs -> obs.takeWhile(v -> sources.contains(obs)))
5161
);
5262

53-
if (initialCapacity > 0) {
54-
return updatingSource.cacheWithInitialCapacity(initialCapacity);
63+
if (transformer == null) {
64+
output = updatingSource;
5565
} else {
56-
return updatingSource;
66+
output = updatingSource.compose(transformer);
5767
}
5868
}
5969

70+
/**
71+
* Returns the `Observable` combining all the source Observables, with any transformations that were specified
72+
* on construction.
73+
* @return
74+
*/
75+
public Observable<T> toObservable() {
76+
return output;
77+
}
6078
public void add(Observable<T> observable) {
6179
sources.add(observable);
6280
}
@@ -72,7 +90,7 @@ public void removeAll(Observable<T>... observables) {
7290
public void clear() {
7391
sources.clear();
7492
}
75-
public ObservableSet<Observable<T>> getBackingSet() {
93+
public ObservableSet<Observable<T>> getSources() {
7694
return FXCollections.unmodifiableObservableSet(sources);
7795
}
7896
}

src/test/java/rx/javafx/sources/JavaFxObservableTest.java

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import rx.schedulers.Schedulers;
3131
import rx.subjects.PublishSubject;
3232

33-
import java.util.Arrays;
3433
import java.util.ArrayList;
34+
import java.util.Arrays;
3535
import java.util.List;
3636
import java.util.concurrent.CountDownLatch;
3737

@@ -344,44 +344,4 @@ public void testcompositeObservableInfinite() {
344344
e.printStackTrace();
345345
}
346346
}
347-
348-
@Test
349-
public void testcompositeObservableFinite() {
350-
351-
new JFXPanel();
352-
353-
CountDownLatch latch = new CountDownLatch(1);
354-
355-
Platform.runLater(() -> {
356-
final List<String> emissions = new ArrayList<>();
357-
CompositeObservable<String> compositeObservable = new CompositeObservable<>();
358-
359-
Observable<String> source1 = Observable.just("Alpha","Beta");
360-
Observable<String> source2 = Observable.just("Gamma","Delta");
361-
362-
compositeObservable.add(source1);
363-
364-
compositeObservable.toObservable().subscribe(emissions::add);
365-
366-
compositeObservable.add(source2);
367-
368-
assertTrue(emissions.size() == 4);
369-
370-
compositeObservable.remove(source2);
371-
372-
assertTrue(emissions.size() == 4);
373-
374-
compositeObservable.add(source2);
375-
376-
assertTrue(emissions.size() == 6);
377-
378-
latch.countDown();
379-
});
380-
381-
try {
382-
latch.await();
383-
} catch (InterruptedException e) {
384-
e.printStackTrace();
385-
}
386-
}
387347
}

0 commit comments

Comments
 (0)