Skip to content

Commit 90a6ca4

Browse files
authored
Merge pull request #50 from protogenes/feature/scheduler-cleanup-1.x
1.x JavaFxScheduler rewrite, fix for #48
2 parents c7e84bf + 4a5e88d commit 90a6ca4

File tree

2 files changed

+228
-228
lines changed

2 files changed

+228
-228
lines changed
Lines changed: 137 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2016 Netflix, Inc.
3-
*
3+
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
* <p>
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
* <p>
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,194 +18,151 @@
1818
import javafx.animation.KeyFrame;
1919
import javafx.animation.Timeline;
2020
import javafx.application.Platform;
21-
import javafx.event.ActionEvent;
22-
import javafx.event.EventHandler;
2321
import javafx.util.Duration;
2422
import rx.Scheduler;
2523
import rx.Subscription;
2624
import rx.functions.Action0;
27-
import rx.subscriptions.BooleanSubscription;
28-
import rx.subscriptions.CompositeSubscription;
29-
import rx.subscriptions.SerialSubscription;
3025
import rx.subscriptions.Subscriptions;
3126

32-
import java.util.Queue;
33-
import java.util.concurrent.ConcurrentLinkedQueue;
3427
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicReference;
3529

3630
/**
3731
* Executes work on the JavaFx UI thread.
3832
* This scheduler should only be used with actions that execute quickly.
3933
*/
4034
public final class JavaFxScheduler extends Scheduler {
41-
private static final JavaFxScheduler INSTANCE = new JavaFxScheduler();
42-
43-
/* package for unit test */JavaFxScheduler() {
44-
}
45-
46-
public static JavaFxScheduler getInstance() {
47-
return INSTANCE;
48-
}
49-
public static JavaFxScheduler platform() {
50-
return INSTANCE;
51-
}
52-
53-
private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) {
54-
if (delay < 0 || delay > Integer.MAX_VALUE) {
55-
throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
56-
}
57-
}
58-
59-
@Override
60-
public Worker createWorker() {
61-
return new InnerJavaFxScheduler();
62-
}
63-
64-
private static class InnerJavaFxScheduler extends Worker implements Runnable {
65-
66-
private final CompositeSubscription tracking = new CompositeSubscription();
67-
68-
/** Allows cheaper trampolining than invokeLater(). Accessed from EDT only. */
69-
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
70-
/** Allows cheaper trampolining than invokeLater(). Accessed from EDT only. */
71-
private int wip;
72-
73-
@Override
74-
public void unsubscribe() {
75-
tracking.unsubscribe();
76-
}
77-
78-
@Override
79-
public boolean isUnsubscribed() {
80-
return tracking.isUnsubscribed();
81-
}
82-
83-
@Override
84-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
85-
long delay = Math.max(0,unit.toMillis(delayTime));
86-
assertThatTheDelayIsValidForTheJavaFxTimer(delay);
87-
88-
class DualAction implements EventHandler<ActionEvent>, Subscription, Runnable {
89-
private Timeline timeline;
90-
final SerialSubscription subs = new SerialSubscription();
91-
boolean nonDelayed;
92-
93-
private void setTimer(Timeline timeline) {
94-
this.timeline = timeline;
95-
}
96-
97-
@Override
98-
public void handle(ActionEvent event) {
99-
run();
100-
}
101-
102-
@Override
103-
public void run() {
104-
if (nonDelayed) {
105-
try {
106-
if (tracking.isUnsubscribed() || isUnsubscribed()) {
107-
return;
108-
}
109-
action.call();
110-
} finally {
111-
subs.unsubscribe();
112-
}
113-
} else {
114-
timeline.stop();
115-
timeline = null;
116-
nonDelayed = true;
117-
trampoline(this);
118-
}
119-
}
120-
121-
@Override
122-
public boolean isUnsubscribed() {
123-
return subs.isUnsubscribed();
124-
}
125-
126-
@Override
127-
public void unsubscribe() {
128-
subs.unsubscribe();
129-
}
130-
public void set(Subscription s) {
131-
subs.set(s);
132-
}
133-
}
134-
135-
final DualAction executeOnce = new DualAction();
136-
tracking.add(executeOnce);
137-
138-
final Timeline timer = new Timeline(new KeyFrame(Duration.millis(delay), executeOnce));
139-
executeOnce.setTimer(timer);
140-
timer.play();
141-
142-
executeOnce.set(Subscriptions.create(() -> {
143-
timer.stop();
144-
tracking.remove(executeOnce);
145-
}));
146-
147-
return executeOnce;
148-
}
149-
150-
@Override
151-
public Subscription schedule(final Action0 action) {
152-
final BooleanSubscription s = BooleanSubscription.create();
153-
Runnable runnable = () -> {
154-
try {
155-
if (tracking.isUnsubscribed() || s.isUnsubscribed()) {
156-
return;
157-
}
158-
action.call();
159-
} finally {
160-
tracking.remove(s);
161-
}
162-
};
163-
tracking.add(s);
164-
165-
if (Platform.isFxApplicationThread()) {
166-
if (trampoline(runnable)) {
167-
return Subscriptions.unsubscribed();
168-
}
169-
}else {
170-
queue.offer(runnable);
171-
Platform.runLater(this);
172-
}
173-
174-
// wrap for returning so it also removes it from the 'innerSubscription'
175-
return Subscriptions.create(() -> tracking.remove(s));
176-
}
177-
/**
178-
* Uses a fast-path/slow path trampolining and tries to run
179-
* the given runnable directly.
180-
* @param runnable
181-
* @return true if the fast path was taken
182-
*/
183-
boolean trampoline(Runnable runnable) {
184-
// fast path: if wip increments from 0 to 1
185-
if (wip == 0) {
186-
wip = 1;
187-
runnable.run();
188-
// but a recursive schedule happened
189-
if (--wip > 0) {
190-
do {
191-
Runnable r = queue.poll();
192-
r.run();
193-
} while (--wip > 0);
194-
}
195-
return true;
196-
}
197-
queue.offer(runnable);
198-
run();
199-
return false;
200-
}
201-
@Override
202-
public void run() {
203-
if (wip++ == 0) {
204-
do {
205-
Runnable r = queue.poll();
206-
r.run();
207-
} while (--wip > 0);
208-
}
209-
}
210-
}
35+
private static final JavaFxScheduler INSTANCE = new JavaFxScheduler();
36+
37+
/* package for unit test */JavaFxScheduler() {
38+
}
39+
40+
public static JavaFxScheduler getInstance() {
41+
return INSTANCE;
42+
}
43+
44+
public static JavaFxScheduler platform() {
45+
return INSTANCE;
46+
}
47+
48+
private static void assertThatTheDelayIsValidForTheJavaFxTimer(long delay) {
49+
if (delay < 0 || delay > Integer.MAX_VALUE) {
50+
throw new IllegalArgumentException(String.format("The JavaFx timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE));
51+
}
52+
}
53+
54+
@Override
55+
public Worker createWorker() {
56+
return new JavaFxWorker();
57+
}
58+
59+
/**
60+
* A Worker implementation which manages a queue of QueuedRunnable for execution on the Java FX Application thread
61+
* For a simpler implementation the queue always contains at least one element.
62+
* {@link #head} is the element, which is in execution or was last executed
63+
* {@link #tail} is an atomic reference to the last element in the queue, or null when the worker was disposed
64+
* Recursive actions are not preferred and inserted at the tail of the queue as any other action would be
65+
* The Worker will only schedule a single job with {@link Platform#runLater(Runnable)} for when the queue was previously empty
66+
*/
67+
private static class JavaFxWorker extends Worker implements Runnable {
68+
private volatile QueuedRunnable head = new QueuedRunnable(null); /// only advanced in run(), initialised with a starter element
69+
private final AtomicReference<QueuedRunnable> tail = new AtomicReference<>(head); /// points to the last element, null when disposed
70+
71+
private static class QueuedRunnable extends AtomicReference<QueuedRunnable> implements Subscription, Action0 {
72+
private volatile Action0 action;
73+
74+
private QueuedRunnable(Action0 action) {
75+
this.action = action;
76+
}
77+
78+
@Override
79+
public void unsubscribe() {
80+
action = null;
81+
}
82+
83+
@Override
84+
public boolean isUnsubscribed() {
85+
return action == null;
86+
}
87+
88+
@Override
89+
public void call() {
90+
Action0 action = this.action;
91+
if (action != null) {
92+
action.call();
93+
}
94+
this.action = null;
95+
}
96+
}
97+
98+
@Override
99+
public void unsubscribe() {
100+
tail.set(null);
101+
QueuedRunnable qr = this.head;
102+
while (qr != null) {
103+
qr.unsubscribe();
104+
qr = qr.getAndSet(null);
105+
}
106+
}
107+
108+
@Override
109+
public boolean isUnsubscribed() {
110+
return tail.get() == null;
111+
}
112+
113+
@Override
114+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
115+
long delay = Math.max(0, unit.toMillis(delayTime));
116+
assertThatTheDelayIsValidForTheJavaFxTimer(delay);
117+
118+
final QueuedRunnable queuedRunnable = new QueuedRunnable(action);
119+
if (delay == 0) { // delay is too small for the java fx timer, schedule it without delay
120+
return schedule(queuedRunnable);
121+
}
122+
123+
final Timeline timer = new Timeline(new KeyFrame(Duration.millis(delay), event -> schedule(queuedRunnable)));
124+
timer.play();
125+
126+
return Subscriptions.create(() -> {
127+
queuedRunnable.unsubscribe();
128+
timer.stop();
129+
});
130+
}
131+
132+
@Override
133+
public Subscription schedule(final Action0 action) {
134+
if (isUnsubscribed()) {
135+
return Subscriptions.unsubscribed();
136+
}
137+
138+
final QueuedRunnable queuedRunnable = action instanceof QueuedRunnable ? (QueuedRunnable) action : new QueuedRunnable(action);
139+
140+
QueuedRunnable tailPivot;
141+
do {
142+
tailPivot = tail.get();
143+
} while (tailPivot != null && !tailPivot.compareAndSet(null, queuedRunnable));
144+
145+
if (tailPivot == null) {
146+
queuedRunnable.unsubscribe();
147+
} else {
148+
tail.compareAndSet(tailPivot, queuedRunnable); // can only fail with a concurrent dispose and we don't want to override the disposed value
149+
if (tailPivot == head) {
150+
if (Platform.isFxApplicationThread()) {
151+
run();
152+
} else {
153+
Platform.runLater(this);
154+
}
155+
}
156+
}
157+
return queuedRunnable;
158+
}
159+
160+
@Override
161+
public void run() {
162+
for (QueuedRunnable qr = head.get(); qr != null; qr = qr.get()) {
163+
qr.call();
164+
head = qr;
165+
}
166+
}
167+
}
211168
}

0 commit comments

Comments
 (0)