diff --git a/build-release-17 b/build-release-17 deleted file mode 100644 index e69de29b..00000000 diff --git a/pom.xml b/pom.xml index 796f4239..f166a079 100644 --- a/pom.xml +++ b/pom.xml @@ -86,13 +86,6 @@ nativeimage 24.1.2 provided - - - - * - * - - org.jboss.logging @@ -182,19 +175,55 @@ maven-compiler-plugin - - - - org.jboss.logging - jboss-logging-processor - ${version.jboss.logging.tools} - - - - - --add-reads=org.jboss.threads=ALL-UNNAMED - - + + + default-compile + + + + org.jboss.logging + jboss-logging-processor + ${version.jboss.logging.tools} + + + + --add-exports=java.base/jdk.internal.vm=ALL-UNNAMED,org.jboss.threads + + 21 + 21 + + + + compile-virtual-threads + compile + + compile + + + + + org.jboss.logging + jboss-logging-processor + ${version.jboss.logging.tools} + + + + **/org/jboss/threads/virtual/*.java + + 17 + + + + default-testCompile + test-compile + + testCompile + + + 17 + + + maven-source-plugin diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index e68e5fb0..6ac7a55c 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -3,6 +3,7 @@ requires jdk.unsupported; requires org.jboss.logging; requires static org.jboss.logging.annotations; + requires static org.graalvm.nativeimage; requires org.wildfly.common; requires io.smallrye.common.annotation; requires io.smallrye.common.constraint; diff --git a/src/main/java/org/jboss/threads/virtual/Access.java b/src/main/java/org/jboss/threads/virtual/Access.java new file mode 100644 index 00000000..99ba9a4d --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Access.java @@ -0,0 +1,113 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import jdk.internal.vm.ThreadContainer; + +/** + * Access methods for virtual thread internals. + */ +final class Access { + private static final MethodHandle currentCarrierThread; + private static final MethodHandle virtualThreadFactory; + private static final MethodHandle threadStartWithContainer; + private static final MethodHandle schedulerGetter; + private static final MethodHandle continuationGetter; + + static { + MethodHandle ct; + MethodHandle vtf; + MethodHandle tswc; + MethodHandle sg; + MethodHandle cg; + try { + MethodHandles.Lookup thr = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup()); + ct = thr.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class)); + Class vtbClass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder", false, null); + try { + vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, ScheduledExecutorService.class)); + } catch (NoSuchMethodException | NoSuchMethodError ignored) { + // unpatched JDK + vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class)); + } + // create efficient transformer + vtf = vtf.asType(MethodType.methodType(Thread.Builder.OfVirtual.class, ThreadScheduler.class)); + // todo: maybe instead, we can directly call `java.lang.ThreadBuilders.newVirtualThread` + //void start(jdk.internal.vm.ThreadContainer container) + tswc = thr.findVirtual(Thread.class, "start", MethodType.methodType(void.class, ThreadContainer.class)); + Class vtc = thr.findClass("java.lang.VirtualThread"); + MethodHandles.Lookup vthr = MethodHandles.privateLookupIn(vtc, MethodHandles.lookup()); + try { + sg = vthr.findGetter(vtc, "scheduler", ScheduledExecutorService.class); + } catch (NoSuchFieldException | NoSuchFieldError ignored) { + // unpatched JDK + sg = vthr.findGetter(vtc, "scheduler", Executor.class); + } + sg = sg.asType(MethodType.methodType(Executor.class, Thread.class)); + cg = vthr.findGetter(vtc, "runContinuation", Runnable.class).asType(MethodType.methodType(Runnable.class, Thread.class)); + } catch (Throwable e) { + // no good + throw new InternalError("Cannot initialize virtual threads", e); + } + currentCarrierThread = ct; + virtualThreadFactory = vtf; + threadStartWithContainer = tswc; + schedulerGetter = sg; + continuationGetter = cg; + } + + static Thread currentCarrier() { + try { + return (Thread) currentCarrierThread.invokeExact(); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static Thread.Builder.OfVirtual threadBuilder(ThreadScheduler threadScheduler) { + try { + return (Thread.Builder.OfVirtual) virtualThreadFactory.invokeExact(threadScheduler); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static void startThread(Thread thread, ThreadContainer threadContainer) { + try { + threadStartWithContainer.invokeExact(thread, threadContainer); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static Executor schedulerOf(Thread thread) { + try { + return (Executor) schedulerGetter.invokeExact(thread); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static Runnable continuationOf(Thread thread) { + try { + return (Runnable) continuationGetter.invokeExact(thread); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/Dispatcher.java b/src/main/java/org/jboss/threads/virtual/Dispatcher.java new file mode 100644 index 00000000..eae64e97 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Dispatcher.java @@ -0,0 +1,9 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.ScheduledFuture; + +abstract class Dispatcher { + abstract void execute(UserThreadScheduler continuation); + + abstract ScheduledFuture schedule(Runnable task, long nanos); +} diff --git a/src/main/java/org/jboss/threads/virtual/EventLoop.java b/src/main/java/org/jboss/threads/virtual/EventLoop.java new file mode 100644 index 00000000..df2e2e87 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoop.java @@ -0,0 +1,51 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +import io.smallrye.common.annotation.Experimental; + +/** + * An event loop for a virtual thread scheduler. + * There will be one instance per I/O thread within an event loop group. + */ +@Experimental("Experimental virtual thread support") +public abstract class EventLoop { + /** + * Construct a new instance. + */ + protected EventLoop() {} + + /** + * Unpark all ready threads and return, + * possibly waiting for some amount of time if no threads are ready. + * The wait time may be {@code 0}, in which case this method should return immediately if no threads are ready, + * or {@code -1}, in which case the method should wait indefinitely for threads to become ready. + * Otherwise, the wait time is the maximum number of nanoseconds to wait for threads to become ready before returning. + *

+ * Regardless of the wait time, the method should park or return immediately if the {@link #wakeup()} method is invoked + * from any thread. + *

+ * This method will be called in a loop (the event loop, in fact). + * After each invocation of this method, up to one other waiting thread will be continued. + * Since this generally would lead to busy-looping, + * the implementation of this method should + * {@linkplain VirtualThreads#yieldNanos(long) yield for some amount of time} before returning to allow other threads to run. + *

+ * Note that {@linkplain Thread#sleep(long) sleeping} or {@linkplain LockSupport#park() parking} may cause latency spikes, + * so it is not recommended. + *

+ * This method should only be called from the event loop virtual thread. + * + * @param waitTime {@code 0} to return immediately after unparking any ready threads (even if there are none), + * {@code -1} unpark any ready threads or to wait indefinitely for a thread to become ready, + * or any positive integer to unpark any ready threads or to wait for no more than that number of nanoseconds + * @throws InterruptedException if some interruptible operation was interrupted + */ + protected abstract void unparkAny(long waitTime) throws InterruptedException; + + /** + * Forcibly awaken the event loop, if it is currently blocked in {@link #unparkAny(long)}. + * This method may be called from any thread. + */ + protected abstract void wakeup(); +} diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java new file mode 100644 index 00000000..f0012847 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -0,0 +1,341 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Predicate; + +import io.smallrye.common.annotation.Experimental; +import org.jboss.threads.JBossThread; + +/** + * An event loop carrier thread with a built-in executor. + * To acquire the virtual thread of the event loop which is carried by this thread, + * use {@link #virtualThread()}. + */ +public final class EventLoopThread extends JBossThread implements Executor { + /** + * The number of nanoseconds to offset the wait time by, per priority point. + */ + private static final long PRIORITY_BIAS = 50_000L; + /** + * The virtual thread which runs the event handler. + */ + private final EventLoopThreadScheduler elts; + /** + * The event loop implementation. + */ + private final EventLoop eventLoop; + /** + * Comparison nanos. + * This is not representative of the current time; rather, it's a somewhat-recent (but arbitrary) + * sample of {@code System.nanoTime()}. + * Update before each queue operation and this queue can run forever, as long as no task waits for more than ~138 years. + * Such tasks might unexpectedly have a different priority. + * But it is not likely to matter at that point. + */ + private long cmpNanos; + /** + * Current nanoseconds. + * This is a snapshot of the current time, taken immediately before draining the delay queue. + */ + private long currentNanos; + /** + * The shared/slow task queue, which allows tasks to be enqueued from outside of this thread. + */ + private final LinkedBlockingQueue sq = new LinkedBlockingQueue<>(); + /** + * The task queue. + * The queue is ordered by the amount of time that each entry (thread) has been waiting to run. + */ + private final PriorityQueue q = new PriorityQueue<>(this::compare); + /** + * The bulk remover for transferring {@code sq} to {@code q}. + */ + private final Predicate bulkRemover = q::add; + /** + * The delay queue for timed sleep (patched JDKs only). + */ + private final DelayQueue> dq = new DelayQueue<>(); + /** + * The event loop's dispatcher. + */ + private final Dispatcher dispatcher = new EventLoopDispatcher(); + + EventLoopThread(final Scheduler scheduler, final int idx, final EventLoop eventLoop) { + super(() -> {}, "Event loop carrier thread " + idx); + elts = new EventLoopThreadScheduler(scheduler, this, idx); + this.eventLoop = eventLoop; + } + + /** + * Create and start a virtual thread which is carried by this I/O thread. + * + * @param runnable the body of the virtual thread (must not be {@code null}) + */ + public void execute(Runnable runnable) { + elts.scheduler().executeOnEventLoop(this, runnable); + } + + /** + * {@return a new executor service which pools threads and schedules tasks to this event loop} + * Executed tasks are free to change to another event loop or the shared pool. + * Tasks are always started with an association to this event loop. + */ + @Experimental("Pooled event-loop-bound threads") + public Executor newPool() { + return new Executor() { + static final VarHandle taskHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "task", VarHandle.class, Runner.class, Runnable.class); + class Runner implements Runnable { + + private volatile Thread thread; + @SuppressWarnings("unused") // taskHandle + private volatile Runnable task; + + Runner(final Runnable task) { + this.task = task; + } + + public void run() { + thread = Thread.currentThread(); + for (;;) { + // get and run next task + Runnable task = (Runnable) taskHandle.getAndSet(this, null); + if (task != null) { + try { + task.run(); + } catch (Throwable ignored) {} + Util.clearUnpark(); + // re-add to queue /after/ clearing park permit + q.addFirst(this); + } + // wait for a new task + VirtualThreads.parkAndResumeOn(EventLoopThread.this); + } + } + } + + private final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque<>(); + + public void execute(final Runnable command) { + Runner runner; + for (;;) { + runner = q.poll(); + if (runner == null) { + // don't wait around, just start a new thread immediately + runner = new Runner(command); + EventLoopThread.this.execute(runner); + return; + } + // try to set the task + if (taskHandle.compareAndSet(runner, null, command)) { + LockSupport.unpark(runner.thread); + return; + } + } + } + }; + } + + /** + * {@return the owner of this event loop thread} + */ + Scheduler owner() { + return elts.scheduler(); + } + + /** + * {@return the event loop for this thread} + */ + public EventLoop eventLoop() { + return eventLoop; + } + + /** + * {@return the virtual thread of the event loop itself} + */ + public Thread virtualThread() { + return elts.virtualThread(); + } + + /** + * {@return the current event loop thread, or {@code null} if the current thread is not mounted on an event loop thread} + */ + public static EventLoopThread current() { + return Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt ? elt : null; + } + + public void interrupt() { + // interruption of I/O carrier threads is not allowed + } + + /** + * Run the carrier side of the event loop. + * This should only be called by {@code Thread.start()}. + * + * @throws IllegalThreadStateException if called inappropriately + */ + public void run() { + if (Thread.currentThread() != this || elts.virtualThread().isAlive()) { + throw new IllegalThreadStateException(); + } + elts.start(); + // initialize the wait-time comparison basis + cmpNanos = System.nanoTime(); + for (;;) { + // drain shared queue, hopefully somewhat efficiently + sq.removeIf(bulkRemover); + long waitTime = -1L; + if (!dq.isEmpty()) { + // process the delay queue + currentNanos = System.nanoTime(); + DelayedTask dt = dq.poll(); + while (dt != null) { + // this will indirectly cause entries to be added to `q` + dt.run(); + dt = dq.poll(); + } + dt = dq.peek(); + if (dt != null) { + // set the max wait time to the amount of time before the next scheduled task + waitTime = Math.max(1L, dt.deadline - currentNanos); + } + } + ThreadScheduler removed = q.poll(); + if (removed == null) { + // all threads are parked, even the event loop, so just wait around + LockSupport.park(); + } else { + if (removed == elts) { + // configure the wait time + elts.setWaitTime(waitTime); + } + // update for next q operation without hitting nanoTime over and over + cmpNanos = removed.waitingSinceTime(); + removed.run(); + } + } + } + + void enqueue(final ThreadScheduler continuation) { + Thread ct = Thread.currentThread(); + if (ct == this || Access.currentCarrier() == this) { + q.add(continuation); + } else { + sq.add(continuation); + LockSupport.unpark(this); + eventLoop().wakeup(); + } + } + + /** + * Compare the wait times of two tasks. + * The task that has waited for the longest time is considered earlier than the task that has a shorter wait time. + * + * @param o1 the first thread scheduler (must not be {@code null}) + * @param o2 the second thread scheduler (must not be {@code null}) + * @return the comparison result + */ + private int compare(ThreadScheduler o1, ThreadScheduler o2) { + long cmpNanos = this.cmpNanos; + + // with priority, we have a potentially greater-than-64-bit number + long w1 = o1.waitingSince(cmpNanos); + int s1 = Thread.NORM_PRIORITY - o1.priority(); + w1 += s1 * PRIORITY_BIAS; + + long w2 = o2.waitingSince(cmpNanos); + int s2 = Thread.NORM_PRIORITY - o2.priority(); + w2 += s2 * PRIORITY_BIAS; + + return Long.compare(w2, w1); + } + + ScheduledFuture schedule(final Runnable command, final long nanos) { + // it is expected that this will only be called locally + Thread ct = Thread.currentThread(); + assert ct == EventLoopThread.this; + DelayedTask dt = new DelayedTask<>(command, System.nanoTime() + nanos); + dq.add(dt); + return dt; + } + + Dispatcher dispatcher() { + return dispatcher; + } + + private final class DelayedTask implements ScheduledFuture, Runnable { + private final Runnable task; + private final long deadline; + + private DelayedTask(final Runnable task, final long deadline) { + this.task = task; + this.deadline = deadline; + } + + public long getDelay(final TimeUnit unit) { + long currentNanos = EventLoopThread.this.currentNanos; + return unit.convert(deadline - currentNanos, TimeUnit.NANOSECONDS); + } + + public int compareTo(final Delayed o) { + if (o instanceof DelayedTask dt) { + long currentNanos = EventLoopThread.this.currentNanos; + return Long.compare(deadline - currentNanos, dt.deadline - currentNanos); + } else { + throw new IllegalStateException(); + } + } + + public boolean cancel(final boolean mayInterruptIfRunning) { + Thread ct = Thread.currentThread(); + if (ct == EventLoopThread.this || ct.isVirtual() && Access.currentCarrier() == EventLoopThread.this) { + return dq.remove(this); + } + // else unsupported + return false; + } + + public boolean isCancelled() { + // unsupported + return false; + } + + public boolean isDone() { + // unsupported + return false; + } + + public V get() { + throw new UnsupportedOperationException(); + } + + public V get(final long timeout, final TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public void run() { + task.run(); + } + } + + private class EventLoopDispatcher extends Dispatcher { + void execute(final UserThreadScheduler continuation) { + enqueue(continuation); + } + + ScheduledFuture schedule(final Runnable task, final long nanos) { + return EventLoopThread.this.schedule(task, nanos); + } + } +} + diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java new file mode 100644 index 00000000..12d1e216 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java @@ -0,0 +1,66 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * The thread scheduler for an event loop thread. + */ +final class EventLoopThreadScheduler extends ThreadScheduler { + private static final VarHandle waitTimeHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "waitTime", VarHandle.class, EventLoopThreadScheduler.class, long.class); + + private final EventLoopThread eventLoopThread; + /** + * The wait time for the virtual thread side of the event loop. + * This value is handed back and forth between the I/O carrier thread and the event loop virtual thread. + */ + @SuppressWarnings("unused") // waitTimeHandle + private volatile long waitTime = -1; + + EventLoopThreadScheduler(final Scheduler scheduler, final EventLoopThread eventLoopThread, final long idx) { + super(scheduler, "event-loop-", idx); + this.eventLoopThread = eventLoopThread; + // set event loop thread to almost-maximum priority + setPriority(Thread.MAX_PRIORITY - 1); + } + + void runThreadBody() { + // this runs on the *virtual* event loop thread + EventLoopThread eventLoopThread = this.eventLoopThread; + final EventLoop eventLoop = eventLoopThread.eventLoop(); + long waitTime; + for (;;) { + // clear the unpark permit + Util.clearUnpark(); + // clear interrupt status + Thread.interrupted(); + // call the event loop + waitTime = (long) waitTimeHandle.getOpaque(this); + try { + eventLoop.unparkAny(waitTime); + } catch (Throwable ignored) { + } + // avoid starvation + if (! yielded()) { + Thread.yield(); + // yielding sets the flag to true, so clear it again + yielded(); + } + } + } + + void setWaitTime(long nanos) { + waitTimeHandle.setOpaque(this, nanos); + } + + public void execute(final Runnable command) { + eventLoopThread.enqueue(this); + } + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + return eventLoopThread.schedule(command, unit.convert(delay, TimeUnit.NANOSECONDS)); + } +} diff --git a/src/main/java/org/jboss/threads/virtual/Scheduler.java b/src/main/java/org/jboss/threads/virtual/Scheduler.java new file mode 100644 index 00000000..f5a9c84e --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Scheduler.java @@ -0,0 +1,131 @@ +package org.jboss.threads.virtual; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.common.constraint.Assert; +import jdk.internal.vm.ThreadContainer; +import org.jboss.threads.EnhancedQueueExecutor; + +/** + * A virtual thread scheduler. + */ +@Experimental("Experimental virtual thread support") +public final class Scheduler implements Executor { + private final EnhancedQueueExecutor blockingPool; + private final Container container = new Container(); + private final AtomicInteger eventLoopIdx = new AtomicInteger(1); + private final AtomicLong threadIdx = new AtomicLong(1); + private final PoolDispatcher poolDispatcher = new PoolDispatcher(); + + /** + * Construct a new instance. + * + * @param blockingPool the blocking pool to use (must not be {@code null}) + */ + Scheduler(final EnhancedQueueExecutor blockingPool) { + this.blockingPool = Assert.checkNotNullParam("blockingPool", blockingPool); + } + + /** + * Construct and start a new event loop thread for this scheduler. + * + * @param eventLoop the event loop to use (must not be {@code null}) + * @return the new event loop thread (not {@code null}) + * @throws NullPointerException if the factory returned a {@code null} event loop + */ + public EventLoopThread newEventLoopThread(EventLoop eventLoop) { + Assert.checkNotNullParam("eventLoop", eventLoop); + EventLoopThread eventLoopThread = new EventLoopThread(this, eventLoopIdx.getAndIncrement(), eventLoop); + eventLoopThread.start(); + return eventLoopThread; + } + + /** + * Construct a new instance. + * + * @param blockingPool the blocking pool to use (must not be {@code null}) + * @return the new scheduler (not {@code null}) + */ + public static Scheduler create(EnhancedQueueExecutor blockingPool) { + return new Scheduler(Assert.checkNotNullParam("blockingPool", blockingPool)); + } + + /** + * Execute the given task in a new virtual thread managed by this scheduler, initially scheduled as a worker + * (CPU-bound) task. + * + * @param command the runnable task + */ + public void execute(final Runnable command) { + new UserThreadScheduler(this, Assert.checkNotNullParam("command", command), threadIdx.getAndIncrement()).start(); + } + + void executeOnEventLoop(final EventLoopThread eventLoopThread, final Runnable command) { + new UserThreadScheduler(this, command, threadIdx.getAndIncrement(), eventLoopThread).start(); + } + + ThreadContainer container() { + return container; + } + + Dispatcher poolDispatcher() { + return poolDispatcher; + } + + static final class Container extends ThreadContainer { + static final boolean DEBUG = false; + final Set threads = ConcurrentHashMap.newKeySet(); + + private Container() { + super(true); + } + + public void onStart(final Thread thread) { + // todo: track shutdown + if (DEBUG) threads.add(thread); + } + + public void onExit(final Thread thread) { + // todo: track shutdown + if (DEBUG) threads.remove(thread); + } + + public String name() { + return "managed"; + } + + public long threadCount() { + return threads.size(); + } + + protected boolean tryClose() { + return super.tryClose(); + } + + public Thread owner() { + return super.owner(); + } + + public Stream threads() { + return DEBUG ? threads.stream() : Stream.empty(); + } + } + + private class PoolDispatcher extends Dispatcher { + void execute(final UserThreadScheduler continuation) { + blockingPool.execute(continuation); + } + + ScheduledFuture schedule(final Runnable task, final long nanos) { + return blockingPool.schedule(task, nanos, TimeUnit.NANOSECONDS); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java new file mode 100644 index 00000000..0af0138b --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java @@ -0,0 +1,210 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * The base class for an individual thread's scheduler. + */ +abstract class ThreadScheduler implements ScheduledExecutorService, Runnable { + private static final VarHandle yieldedHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "yielded", VarHandle.class, ThreadScheduler.class, boolean.class); + private static final VarHandle delayHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "delay", VarHandle.class, ThreadScheduler.class, long.class); + private static final VarHandle priorityHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "priority", VarHandle.class, ThreadScheduler.class, int.class); + /** + * The system nanos time when this task started waiting. + * Accessed only by carrier threads. + */ + private long waitingSinceTime; + /** + * The current thread priority, between {@link Thread#MIN_PRIORITY} (lowest) or {@link Thread#MAX_PRIORITY}. + * Accessed by carrier threads and virtual threads. + */ + @SuppressWarnings("unused") // priorityHandle + private int priority = Thread.NORM_PRIORITY; + /** + * The nanosecond delay time for scheduling. + * Accessed by carrier threads and virtual threads. + */ + @SuppressWarnings("unused") // delayHandle + private long delay; + /** + * Indicate if this thread parked or yielded, to detect a starvation scenario. + * Accessed by carrier threads and virtual threads. + */ + @SuppressWarnings("unused") // yieldedHandle + volatile boolean yielded; + + /** + * The owning scheduler. + */ + private final Scheduler scheduler; + /** + * The virtual thread associated with this task. + */ + private final Thread virtualThread; + + ThreadScheduler(final Scheduler scheduler, final String name, final long idx) { + this.scheduler = scheduler; + this.virtualThread = Access.threadBuilder(this).inheritInheritableThreadLocals(false).name(name, idx).unstarted(this::runThreadBody); + } + + void start() { + Access.startThread(virtualThread, scheduler.container()); + } + + abstract void runThreadBody(); + + void delayBy(long nanos) { + delayHandle.setOpaque(this, nanos); + } + + long waitingSinceTime() { + return waitingSinceTime; + } + + int priority() { + return (int) priorityHandle.getOpaque(this); + } + + void setPriority(final int priority) { + priorityHandle.setOpaque(this, priority); + } + + /** + * {@return the number of nanoseconds that this thread has been waiting for} + * The higher the waiting-since time, the higher priority a thread will have. + * This value may be negative if the wait time includes a delay. + * + * @param current the current time + */ + long waitingSince(long current) { + long delay = (long) delayHandle.getOpaque(this); + // delay is always 0 or positive, so result may be negative + return Math.max(0, current - waitingSinceTime) - delay; + } + + /** + * Run the continuation for the current thread. + */ + public void run() { + assert ! Thread.currentThread().isVirtual(); + // reset delay + delayBy(0); + // todo: we could change the carrier thread priority when running on the pool, but it might not pay off + try { + Access.continuationOf(virtualThread).run(); + } finally { + waitingSinceTime = System.nanoTime(); + yieldedHandle.setOpaque(this, true); + } + } + + /** + * {@return {@code true} if this scheduler has yielded since the last invocation of this method, or {@code false} otherwise} + */ + boolean yielded() { + boolean yielded = (boolean) yieldedHandle.getOpaque(this); + if (yielded) { + yieldedHandle.setOpaque(this, false); + } + return yielded; + } + + /** + * {@return the overall scheduler for this thread scheduler} + */ + Scheduler scheduler() { + return scheduler; + } + + /** + * {@return the virtual thread associated with this thread scheduler} + */ + Thread virtualThread() { + return virtualThread; + } + + /** + * Schedule the given command, which may be the continuation for this thread. + * + * @param command the command (must not be {@code null}) + */ + public abstract void execute(Runnable command); + + /** + * Schedule the given command, which is generally an unpark request for some other sleeping thread. + * + * @param command the command (must not be {@code null}) + */ + public abstract ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + // unimplemented methods + + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public void shutdown() { + throw new UnsupportedOperationException(); + } + + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public Future submit(Callable task) { + throw new UnsupportedOperationException(); + } + + public Future submit(Runnable task, T result) { + throw new UnsupportedOperationException(); + } + + public Future submit(Runnable task) { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public T invokeAny(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java new file mode 100644 index 00000000..73778530 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java @@ -0,0 +1,98 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * A scheduler for a specific thread. + */ +final class UserThreadScheduler extends ThreadScheduler { + /** + * The current scheduling executor. + */ + private Dispatcher dispatcher; + /** + * The user task. + */ + private Runnable task; + + UserThreadScheduler(final Scheduler scheduler, final Runnable task, final long idx) { + this(scheduler, task, scheduler.poolDispatcher(), idx); + } + + UserThreadScheduler(final Scheduler scheduler, final Runnable task, final long idx, final EventLoopThread eventLoopThread) { + this(scheduler, task, eventLoopThread.dispatcher(), idx); + } + + private UserThreadScheduler(final Scheduler scheduler, final Runnable task, final Dispatcher dispatcher, final long idx) { + super(scheduler, "user-", idx); + this.dispatcher = dispatcher; + this.task = task; + } + + void runThreadBody() { + Runnable task = this.task; + // release the reference + this.task = null; + task.run(); + } + + public void execute(Runnable command) { + if (command == Access.continuationOf(virtualThread())) { + dispatcher.execute(this); + } else { + // we can only execute our continuation + throw new IllegalStateException(); + } + } + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + // command is going to be VirtualThread::unpark or similar (runnable from carrier thread) + return dispatcher.schedule(command, unit.toNanos(delay)); + } + + void resumeOn(final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + Thread.yield(); + } + } + + void parkAndResumeOn(final Object blocker, final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + // clear yielded flag + yielded(); + if (blocker == null) { + LockSupport.park(); + } else { + LockSupport.park(blocker); + } + if (! yielded()) { + // park didn't block, so manually reschedule + Thread.yield(); + } + } else { + if (blocker == null) { + LockSupport.park(); + } else { + LockSupport.park(blocker); + } + } + } + + void parkNanosAndResumeOn(final Object blocker, final long nanos, final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + // todo: we have to yield now so that we don't end up calling `schedule` from the wrong thread + // we can optimize this by examining the dispatcher or by synchronizing the queue somehow + Thread.yield(); + } + if (blocker == null) { + LockSupport.parkNanos(nanos); + } else { + LockSupport.parkNanos(blocker, nanos); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/Util.java b/src/main/java/org/jboss/threads/virtual/Util.java new file mode 100644 index 00000000..093d31ce --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Util.java @@ -0,0 +1,17 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +/** + * Utilities. + */ +final class Util { + private Util() {} + + static void clearUnpark() { + // change unpark permit from (0 or 1) to (1) + LockSupport.unpark(Thread.currentThread()); + // change unpark permit from (1) to (0) + LockSupport.park(); + } +} diff --git a/src/main/java/org/jboss/threads/virtual/VirtualThreads.java b/src/main/java/org/jboss/threads/virtual/VirtualThreads.java new file mode 100644 index 00000000..34d12e3d --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/VirtualThreads.java @@ -0,0 +1,225 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +import io.smallrye.common.constraint.Assert; + +/** + * A class with utility methods for manipulating the current virtual thread. + */ +public final class VirtualThreads { + private VirtualThreads() {} + + /** + * {@return the current event loop carrier thread, or {@code null} if the current thread is not currently carried by an event loop thread} + */ + public static EventLoopThread currentEventLoopThread() { + if (Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt) { + return elt; + } else { + return null; + } + } + + /** + * Indicate that the current thread is going to be performing I/O-intensive operations + * with relatively little CPU or native usage. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + */ + public static void resumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.resumeOn(eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Indicate that the current thread is going to be performing native or CPU-intensive operations + * with relatively little I/O usage. + * After this method returns, the current thread will be carried by a pool thread. + */ + public static void resumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.resumeOn(ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @see LockSupport#park() + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(null, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread, Object blocker) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(blocker, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * @see LockSupport#park() + */ + public static void parkAndResumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(null, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOnPool(Object blocker) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(blocker, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(null, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register (see {@link LockSupport#park(Object)}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, Object blocker, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(blocker, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOnPool(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(null, nanos, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOnPool(Object blocker, long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(blocker, nanos, ts.scheduler().poolDispatcher()); + } + } + + /** + * Yield execution to any task is already waiting or will start waiting within the next {@code nanos} nanoseconds. + * If no tasks remain within the given criteria, the current thread will resume. + * + * @param nanos the number of nanoseconds to attempt to yield for + */ + public static void yieldNanos(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + ts.delayBy(Math.max(0, nanos)); + } + Thread.yield(); + } + + /** + * {@return the priority of the current thread or virtual thread} + */ + public static int currentPriority() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + return ts.priority(); + } else { + return thread.getPriority(); + } + } + + /** + * Change the priority of the current virtual thread, if possible. + * + * @param newPriority the new virtual thread priority + */ + public static void changePriority(int newPriority) { + newPriority = Math.min(Math.max(newPriority, Thread.MIN_PRIORITY), Thread.MAX_PRIORITY); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + int old = ts.priority(); + if (newPriority != old) { + // apply new priority + ts.setPriority(newPriority); + Thread.yield(); + } + } else { + thread.setPriority(newPriority); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/package-info.java b/src/main/java/org/jboss/threads/virtual/package-info.java new file mode 100644 index 00000000..fa15017c --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/package-info.java @@ -0,0 +1,7 @@ +/** + * Support for advanced virtual thread scheduling. + */ +@Experimental("Experimental virtual thread support") +package org.jboss.threads.virtual; + +import io.smallrye.common.annotation.Experimental; \ No newline at end of file