Skip to content

Commit ac5f803

Browse files
committed
Implement JavaFxObservable#interval()
1 parent 7462d71 commit ac5f803

File tree

3 files changed

+57
-0
lines changed

3 files changed

+57
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package rx.javafx.sources;
2+
3+
import javafx.animation.Animation;
4+
import javafx.animation.KeyFrame;
5+
import javafx.animation.Timeline;
6+
import javafx.util.Duration;
7+
import rx.Observable;
8+
import rx.subscriptions.JavaFxSubscriptions;
9+
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
public final class TimerSource {
13+
private TimerSource() {
14+
}
15+
16+
17+
public static <T> Observable<Long> interval(final Duration duration) {
18+
return Observable.create(sub -> {
19+
final AtomicLong value = new AtomicLong(0);
20+
Timeline timeline = new Timeline(new KeyFrame(duration, ae -> sub.onNext(value.getAndIncrement())));
21+
timeline.setCycleCount(Animation.INDEFINITE);
22+
timeline.play();
23+
24+
sub.add(JavaFxSubscriptions.unsubscribeInEventDispatchThread(timeline::stop));
25+
});
26+
}
27+
}

src/main/java/rx/observables/JavaFxObservable.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package rx.observables;
1717

1818

19+
import javafx.animation.Animation;
20+
import javafx.animation.KeyFrame;
21+
import javafx.animation.Timeline;
1922
import javafx.beans.value.ObservableValue;
2023
import javafx.collections.ObservableList;
2124
import javafx.collections.ObservableMap;
@@ -29,9 +32,12 @@
2932
import javafx.scene.control.MenuItem;
3033
import javafx.stage.Window;
3134
import javafx.stage.WindowEvent;
35+
import javafx.util.Duration;
3236
import rx.Observable;
3337
import rx.functions.Func1;
38+
import java.util.concurrent.atomic.AtomicLong;
3439
import rx.javafx.sources.*;
40+
import rx.subscriptions.JavaFxSubscriptions;
3541

3642
import java.util.Map;
3743

@@ -281,4 +287,11 @@ public static <T> Observable<T> fromObservableSetRemovals(final ObservableSet<T>
281287
public static <T> Observable<SetChange<T>> fromObservableSetChanges(final ObservableSet<T> source) {
282288
return ObservableSetSource.fromObservableSetChanges(source);
283289
}
290+
291+
/**
292+
* Returns an Observable that emits a 0L and ever increasing numbers after each duration of time thereafter
293+
*/
294+
public static <T> Observable<Long> interval(final Duration duration) {
295+
return TimerSource.interval(duration);
296+
}
284297
}

src/test/java/rx/javafx/sources/JavaFxObservableTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javafx.collections.FXCollections;
2323
import javafx.collections.ObservableList;
2424
import javafx.embed.swing.JFXPanel;
25+
import javafx.util.Duration;
2526
import org.junit.Test;
2627
import rx.Observable;
2728
import rx.observables.JavaFxObservable;
@@ -35,6 +36,22 @@
3536

3637
public final class JavaFxObservableTest {
3738

39+
@Test
40+
public void testIntervalSource() {
41+
new JFXPanel();
42+
43+
final CountDownLatch latch = new CountDownLatch(5);
44+
45+
JavaFxObservable.interval(Duration.millis(1000)).take(5)
46+
.subscribe(v -> latch.countDown());
47+
48+
try {
49+
latch.await();
50+
} catch (Exception e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
3855
@Test
3956
public void testRxObservableListAdds() {
4057
new JFXPanel();

0 commit comments

Comments
 (0)