From a088a4db788bcee60331e99f51394491893f91e2 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Wed, 12 Feb 2025 15:42:05 -0600 Subject: [PATCH 1/8] Initial support for virtual thread migration through thread pools --- build-release-17 | 0 pom.xml | 69 ++-- src/main/java/module-info.java | 1 + .../org/jboss/threads/virtual/Access.java | 107 +++++++ .../org/jboss/threads/virtual/Dispatcher.java | 9 + .../org/jboss/threads/virtual/EventLoop.java | 55 ++++ .../threads/virtual/EventLoopThread.java | 286 +++++++++++++++++ .../virtual/EventLoopThreadScheduler.java | 76 +++++ .../java/org/jboss/threads/virtual/Mode.java | 16 + .../org/jboss/threads/virtual/Scheduler.java | 300 ++++++++++++++++++ .../threads/virtual/ThreadScheduler.java | 157 +++++++++ .../threads/virtual/UserThreadScheduler.java | 125 ++++++++ .../jboss/threads/virtual/package-info.java | 7 + 13 files changed, 1188 insertions(+), 20 deletions(-) delete mode 100644 build-release-17 create mode 100644 src/main/java/org/jboss/threads/virtual/Access.java create mode 100644 src/main/java/org/jboss/threads/virtual/Dispatcher.java create mode 100644 src/main/java/org/jboss/threads/virtual/EventLoop.java create mode 100644 src/main/java/org/jboss/threads/virtual/EventLoopThread.java create mode 100644 src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java create mode 100644 src/main/java/org/jboss/threads/virtual/Mode.java create mode 100644 src/main/java/org/jboss/threads/virtual/Scheduler.java create mode 100644 src/main/java/org/jboss/threads/virtual/ThreadScheduler.java create mode 100644 src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java create mode 100644 src/main/java/org/jboss/threads/virtual/package-info.java diff --git a/build-release-17 b/build-release-17 deleted file mode 100644 index e69de29b..00000000 diff --git a/pom.xml b/pom.xml index 796f4239..f166a079 100644 --- a/pom.xml +++ b/pom.xml @@ -86,13 +86,6 @@ nativeimage 24.1.2 provided - - - - * - * - - org.jboss.logging @@ -182,19 +175,55 @@ maven-compiler-plugin - - - - org.jboss.logging - jboss-logging-processor - ${version.jboss.logging.tools} - - - - - --add-reads=org.jboss.threads=ALL-UNNAMED - - + + + default-compile + + + + org.jboss.logging + jboss-logging-processor + ${version.jboss.logging.tools} + + + + --add-exports=java.base/jdk.internal.vm=ALL-UNNAMED,org.jboss.threads + + 21 + 21 + + + + compile-virtual-threads + compile + + compile + + + + + org.jboss.logging + jboss-logging-processor + ${version.jboss.logging.tools} + + + + **/org/jboss/threads/virtual/*.java + + 17 + + + + default-testCompile + test-compile + + testCompile + + + 17 + + + maven-source-plugin diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index e68e5fb0..6ac7a55c 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -3,6 +3,7 @@ requires jdk.unsupported; requires org.jboss.logging; requires static org.jboss.logging.annotations; + requires static org.graalvm.nativeimage; requires org.wildfly.common; requires io.smallrye.common.annotation; requires io.smallrye.common.constraint; diff --git a/src/main/java/org/jboss/threads/virtual/Access.java b/src/main/java/org/jboss/threads/virtual/Access.java new file mode 100644 index 00000000..158bed30 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Access.java @@ -0,0 +1,107 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import jdk.internal.vm.ThreadContainer; + +/** + * Access methods for virtual thread internals. + */ +final class Access { + private static final MethodHandle currentCarrierThread; + private static final MethodHandle virtualThreadFactory; + private static final MethodHandle threadStartWithContainer; + private static final MethodHandle schedulerGetter; + private static final MethodHandle continuationGetter; + + static { + MethodHandle ct; + MethodHandle vtf; + MethodHandle tswc; + MethodHandle sg; + MethodHandle cg; + try { + MethodHandles.Lookup thr = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup()); + ct = thr.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class)); + Class vtbClass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder", false, null); + try { + vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class)); + } catch (NoSuchMethodError ignored) { + // patched JDK + vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, ScheduledExecutorService.class)); + } + // create efficient transformer + vtf = vtf.asType(MethodType.methodType(Thread.Builder.OfVirtual.class, ThreadScheduler.class)); + // todo: maybe instead, we can directly call `java.lang.ThreadBuilders.newVirtualThread` + //void start(jdk.internal.vm.ThreadContainer container) + tswc = thr.findVirtual(Thread.class, "start", MethodType.methodType(void.class, ThreadContainer.class)); + Class vtc = thr.findClass("java.lang.VirtualThread"); + MethodHandles.Lookup vthr = MethodHandles.privateLookupIn(vtc, MethodHandles.lookup()); + sg = vthr.findGetter(vtc, "scheduler", Executor.class).asType(MethodType.methodType(Executor.class, Thread.class)); + cg = vthr.findGetter(vtc, "runContinuation", Runnable.class).asType(MethodType.methodType(Runnable.class, Thread.class)); + } catch (Throwable e) { + // no good + throw new InternalError("Cannot initialize virtual threads", e); + } + currentCarrierThread = ct; + virtualThreadFactory = vtf; + threadStartWithContainer = tswc; + schedulerGetter = sg; + continuationGetter = cg; + } + + static Thread currentCarrier() { + try { + return (Thread) currentCarrierThread.invokeExact(); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static Thread.Builder.OfVirtual threadBuilder(ThreadScheduler threadScheduler) { + try { + return (Thread.Builder.OfVirtual) virtualThreadFactory.invokeExact(threadScheduler); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static void startThread(Thread thread, ThreadContainer threadContainer) { + try { + threadStartWithContainer.invokeExact(thread, threadContainer); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static ThreadScheduler schedulerOf(Thread thread) { + try { + return (ThreadScheduler) (Executor) schedulerGetter.invokeExact(thread); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } + + static Runnable continuationOf(Thread thread) { + try { + return (Runnable) continuationGetter.invokeExact(thread); + } catch (RuntimeException | Error e) { + throw e; + } catch (Throwable e) { + throw new UndeclaredThrowableException(e); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/Dispatcher.java b/src/main/java/org/jboss/threads/virtual/Dispatcher.java new file mode 100644 index 00000000..eae64e97 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Dispatcher.java @@ -0,0 +1,9 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.ScheduledFuture; + +abstract class Dispatcher { + abstract void execute(UserThreadScheduler continuation); + + abstract ScheduledFuture schedule(Runnable task, long nanos); +} diff --git a/src/main/java/org/jboss/threads/virtual/EventLoop.java b/src/main/java/org/jboss/threads/virtual/EventLoop.java new file mode 100644 index 00000000..1e80c07d --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoop.java @@ -0,0 +1,55 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +import io.smallrye.common.annotation.Experimental; + +/** + * An event loop for a virtual thread scheduler. + * There will be one instance per I/O thread within an event loop group. + */ +@Experimental("Experimental virtual thread support") +public abstract class EventLoop { + /** + * Construct a new instance. + */ + protected EventLoop() {} + + /** + * Unpark all ready threads and return, + * possibly waiting for some amount of time if no threads are ready. + * The wait time may be {@code 0}, in which case this method should return immediately if no threads are ready, + * or {@code -1}, in which case the method should wait indefinitely for threads to become ready. + * Otherwise, the wait time is the maximum number of nanoseconds to wait for threads to become ready before returning. + *

+ * Regardless of the wait time, the method should park or return immediately if the {@link #wakeup()} method is invoked + * from any thread. + *

+ * This method will be called in a loop (the event loop, in fact). + * After each invocation of this method, up to one other waiting thread will be continued. + * Since this generally would lead to busy-looping, + * the implementation of this method should {@linkplain LockSupport#parkNanos(long) park} for some amount of time before returning. + * While the event loop method is parked, + * other threads will be allowed to run. + * If the set of ready threads is exhausted before that time elapses, + * the event loop thread will automatically be unparked, + * allowing the loop to be re-entered from the top to wait for ready events. + *

+ * Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes, + * so it is not recommended. + *

+ * This method should only be called from the event loop virtual thread. + * + * @param waitTime {@code 0} to return immediately after unparking any ready threads (even if there are none), + * {@code -1} unpark any ready threads or to wait indefinitely for a thread to become ready, + * or any positive integer to unpark any ready threads or to wait for no more than that number of nanoseconds + * @throws InterruptedException if some interruptible operation was interrupted + */ + protected abstract void unparkAny(long waitTime) throws InterruptedException; + + /** + * Forcibly awaken the event loop, if it is currently blocked in {@link #unparkAny(long)}. + * This method may be called from any thread. + */ + protected abstract void wakeup(); +} diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java new file mode 100644 index 00000000..574aa769 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -0,0 +1,286 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.PriorityQueue; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; + +import org.jboss.threads.JBossThread; + +/** + * An event loop carrier thread with a built-in executor. + * To acquire the virtual thread of the event loop which is carried by this thread, + * use {@link #virtualThread()}. + */ +public final class EventLoopThread extends JBossThread implements Executor { + private static final VarHandle waitTimeHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "waitTime", VarHandle.class, EventLoopThread.class, long.class); + + // TODO: we could track the currently-mounted thread fairly easily, maybe along with a timestamp + + /** + * The virtual thread which runs the event handler. + */ + private final EventLoopThreadScheduler elts; + /** + * The event loop implementation. + */ + private final EventLoop eventLoop; + /** + * Comparison nanos. + * This is not representative of the current time; rather, it's a somewhat-recent (but arbitrary) + * sample of {@code System.nanoTime()}. + * Update before each queue operation and this queue can run forever, as long as no task waits for more than ~138 years. + * Such tasks might unexpectedly have a different priority. + * But it is not likely to matter at that point. + */ + private long cmpNanos; + /** + * Current nanoseconds. + * This is a snapshot of the current time, taken immediately before draining the delay queue. + */ + private long currentNanos; + /** + * The wait time for the virtual thread side of the event loop. + * This value is handed back and forth between the I/O carrier thread and the event loop virtual thread. + */ + @SuppressWarnings("unused") // waitTimeHandle + private volatile long waitTime = -1; + /** + * The shared/slow task queue, which allows tasks to be enqueued from outside of this thread. + */ + private final LinkedBlockingQueue sq = new LinkedBlockingQueue<>(); + /** + * The task queue. + * The queue is ordered by the amount of time that each entry (thread) has been waiting to run. + */ + private final PriorityQueue q = new PriorityQueue<>(this::compare); + /** + * The bulk remover for transferring {@code sq} to {@code q}. + */ + private final Predicate bulkRemover = q::add; + /** + * The delay queue for timed sleep (patched JDKs only). + */ + private final DelayQueue> dq = new DelayQueue<>(); + private final Dispatcher dispatcher = new EventLoopDispatcher(); + + EventLoopThread(final Scheduler scheduler, final int idx, final Function eventLoopFactory) { + super(() -> {}, "Event loop carrier thread " + idx); + elts = new EventLoopThreadScheduler(scheduler, this, idx); + this.eventLoop = eventLoopFactory.apply(this); + } + + /** + * Create and start a virtual thread which is carried by this I/O thread. + * + * @param runnable the body of the virtual thread (must not be {@code null}) + */ + public void execute(Runnable runnable) { + elts.scheduler().executeOnEventLoop(this, runnable); + } + + /** + * {@return the owner of this event loop thread} + */ + Scheduler owner() { + return elts.scheduler(); + } + + /** + * {@return the event loop for this thread} + */ + public EventLoop eventLoop() { + return eventLoop; + } + + /** + * {@return the virtual thread of the event loop itself} + */ + public Thread virtualThread() { + return elts.virtualThread(); + } + + /** + * {@return the current event loop thread, or {@code null} if the current thread is not mounted on an event loop thread} + */ + public static EventLoopThread current() { + return Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt ? elt : null; + } + + public void interrupt() { + // interruption of I/O carrier threads is not allowed + } + + /** + * Run the carrier side of the event loop. + */ + public void run() { + if (Thread.currentThread() != this || elts.virtualThread().isAlive()) { + throw new IllegalThreadStateException(); + } + elts.start(); + // initialize the wait-time comparison basis + cmpNanos = System.nanoTime(); + for (;;) { + // drain shared queue, hopefully somewhat efficiently + sq.removeIf(bulkRemover); + long waitTime = -1L; + if (!dq.isEmpty()) { + // process the delay queue + currentNanos = System.nanoTime(); + DelayedTask dt = dq.poll(); + while (dt != null) { + // this will indirectly cause entries to be added to `q` + dt.run(); + dt = dq.poll(); + } + dt = dq.peek(); + if (dt != null) { + // set the max wait time to the amount of time before the next scheduled task + waitTime = Math.max(1L, dt.deadline - currentNanos); + } + } + UserThreadScheduler removed = q.poll(); + if (removed == null) { + waitTimeHandle.setOpaque(this, waitTime); + // mark event handler ready + elts.makeReady(); + // call event handler (possibly early) + elts.run(); + } else { + // update for next q operation without hitting nanoTime over and over + cmpNanos = removed.waitingSince(); + removed.run(); + // now, see if we reenter the event loop right away or not + if (elts.ready()) { + waitTimeHandle.setOpaque(this, 0L); + // call event handler + elts.run(); + } + // otherwise, continue processing tasks + } + } + } + + void enqueueFromOutside(final UserThreadScheduler command) { + sq.add(command); + eventLoop().wakeup(); + } + + void enqueueLocal(final UserThreadScheduler command) { + assert Thread.currentThread() == this || Access.currentCarrier() == this; + q.add(command); + } + + /** + * Compare the wait times of two tasks. + * The task that has waited for the longest time is considered earlier than the task that has a shorter wait time. + * + * @param o1 the first thread scheduler (must not be {@code null}) + * @param o2 the second thread scheduler (must not be {@code null}) + * @return the comparison result + */ + private int compare(UserThreadScheduler o1, UserThreadScheduler o2) { + long cmpNanos = this.cmpNanos; + return Long.compare(o2.waitTime(cmpNanos), o1.waitTime(cmpNanos)); + } + + ScheduledFuture schedule(final Runnable command, final long nanos) { + Thread ct = Thread.currentThread(); + if (ct == this || ct.isVirtual() && Access.currentCarrier() == this) { + DelayedTask task = new DelayedTask<>(command, System.nanoTime() + nanos); + dq.add(task); + return task; + } else { + // not expected + throw new IllegalStateException(); + } + } + + Dispatcher dispatcher() { + return dispatcher; + } + + long waitTime() { + return (long) waitTimeHandle.getOpaque(this); + } + + private final class DelayedTask implements ScheduledFuture, Runnable { + private final Runnable task; + private final long deadline; + + private DelayedTask(final Runnable task, final long deadline) { + this.task = task; + this.deadline = deadline; + } + + public long getDelay(final TimeUnit unit) { + long currentNanos = EventLoopThread.this.currentNanos; + return unit.convert(deadline - currentNanos, TimeUnit.NANOSECONDS); + } + + public int compareTo(final Delayed o) { + if (o instanceof DelayedTask dt) { + long currentNanos = EventLoopThread.this.currentNanos; + return Long.compare(deadline - currentNanos, dt.deadline - currentNanos); + } else { + throw new IllegalStateException(); + } + } + + public boolean cancel(final boolean mayInterruptIfRunning) { + // unsupported + return false; + } + + public boolean isCancelled() { + // unsupported + return false; + } + + public boolean isDone() { + // unsupported + return false; + } + + public V get() { + throw new UnsupportedOperationException(); + } + + public V get(final long timeout, final TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public void run() { + task.run(); + } + } + + private class EventLoopDispatcher extends Dispatcher { + void execute(final UserThreadScheduler continuation) { + Thread ct = Thread.currentThread(); + if (ct == EventLoopThread.this || Access.currentCarrier() == EventLoopThread.this) { + enqueueLocal(continuation); + } else { + enqueueFromOutside(continuation); + } + } + + ScheduledFuture schedule(final Runnable task, final long nanos) { + // it is expected that this will only be called locally + assert Thread.currentThread() == EventLoopThread.this; + DelayedTask dt = new DelayedTask<>(task, System.nanoTime() + nanos); + dq.add(dt); + return dt; + } + } +} + diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java new file mode 100644 index 00000000..b1d90de2 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java @@ -0,0 +1,76 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * The thread scheduler for an event loop thread. + */ +final class EventLoopThreadScheduler extends ThreadScheduler { + private static final VarHandle readyHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "ready", VarHandle.class, EventLoopThreadScheduler.class, boolean.class); + + private final EventLoopThread eventLoopThread; + @SuppressWarnings("unused") // readyHandle + private boolean ready; + + EventLoopThreadScheduler(final Scheduler scheduler, final EventLoopThread eventLoopThread, final long idx) { + super(scheduler, "Event loop", idx); + this.eventLoopThread = eventLoopThread; + } + + boolean ready() { + return (boolean) readyHandle.getOpaque(this); + } + + void makeReady() { + readyHandle.setOpaque(this, true); + LockSupport.unpark(virtualThread()); + } + + void runThreadBody() { + // this runs on the *virtual* event loop thread + EventLoopThread eventLoopThread = this.eventLoopThread; + final EventLoop eventLoop = eventLoopThread.eventLoop(); + long waitTime; + for (;;) { + // clear the unpark permit + LockSupport.unpark(Thread.currentThread()); + LockSupport.park(); + // clear interrupt status + Thread.interrupted(); + // call the event loop + waitTime = eventLoopThread.waitTime(); + try { + eventLoop.unparkAny(waitTime); + } catch (Throwable ignored) { + } + // avoid starvation + if (! yielded()) { + Thread.yield(); + // yielding sets the flag to true, so clear it again + yielded(); + } + } + } + + public void run() { + readyHandle.setOpaque(this, false); + super.run(); + } + + void start() { + super.start(); + } + + public void execute(final Runnable command) { + readyHandle.setOpaque(this, true); + } + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + return eventLoopThread.schedule(command, unit.convert(delay, TimeUnit.NANOSECONDS)); + } +} diff --git a/src/main/java/org/jboss/threads/virtual/Mode.java b/src/main/java/org/jboss/threads/virtual/Mode.java new file mode 100644 index 00000000..4a5ce672 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Mode.java @@ -0,0 +1,16 @@ +package org.jboss.threads.virtual; + +/** + * The virtual thread scheduling mode. + */ +enum Mode { + /** + * Schedule on to the I/O thread. + */ + IO, + /** + * Schedule on to the worker thread. + */ + WORKER, + ; +} diff --git a/src/main/java/org/jboss/threads/virtual/Scheduler.java b/src/main/java/org/jboss/threads/virtual/Scheduler.java new file mode 100644 index 00000000..de9aebfd --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Scheduler.java @@ -0,0 +1,300 @@ +package org.jboss.threads.virtual; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.LockSupport; +import java.util.function.Function; +import java.util.stream.Stream; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.common.constraint.Assert; +import jdk.internal.vm.ThreadContainer; +import org.jboss.threads.EnhancedQueueExecutor; + +/** + * A virtual thread scheduler. + */ +@Experimental("Experimental virtual thread support") +public final class Scheduler implements Executor { + private final EnhancedQueueExecutor blockingPool; + private final List eventLoopThreads = new CopyOnWriteArrayList<>(); + private final Container container = new Container(); + private final AtomicInteger eventLoopIdx = new AtomicInteger(1); + private final AtomicLong threadIdx = new AtomicLong(1); + private final PoolDispatcher poolDispatcher = new PoolDispatcher(); + + /** + * Construct a new instance. + * + * @param blockingPool the blocking pool to use (must not be {@code null}) + */ + Scheduler(final EnhancedQueueExecutor blockingPool) { + this.blockingPool = Assert.checkNotNullParam("blockingPool", blockingPool); + } + + /** + * Construct and start a new event loop thread. + * + * @param eventLoopFactory the event loop factory (must not be {@code null}) + * @return the new event loop thread (not {@code null}) + * @throws NullPointerException if the factory returned a {@code null} event loop + */ + public EventLoopThread newEventLoopThread(Function eventLoopFactory) { + Assert.checkNotNullParam("eventLoopFactory", eventLoopFactory); + EventLoopThread eventLoopThread = new EventLoopThread(this, eventLoopIdx.getAndIncrement(), eventLoopFactory); + eventLoopThread.start(); + return eventLoopThread; + } + + /** + * Construct a new instance. + * + * @param blockingPool the blocking pool to use (must not be {@code null}) + * @return the new scheduler (not {@code null}) + */ + public static Scheduler create(EnhancedQueueExecutor blockingPool) { + return new Scheduler(Assert.checkNotNullParam("blockingPool", blockingPool)); + } + + /** + * Execute the given task in a new virtual thread managed by this scheduler, initially scheduled as a worker + * (CPU-bound) task. + * + * @param command the runnable task + */ + public void execute(final Runnable command) { + new UserThreadScheduler(this, Assert.checkNotNullParam("command", command), threadIdx.getAndIncrement()).start(); + } + + void executeOnEventLoop(final EventLoopThread eventLoopThread, final Runnable command) { + new UserThreadScheduler(this, command, threadIdx.getAndIncrement(), eventLoopThread).start(); + } + + /** + * Indicate that the current thread is going to be performing I/O-intensive operations + * with relatively little CPU or native usage. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + */ + public static void resumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.resumeOn(eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Indicate that the current thread is going to be performing native or CPU-intensive operations + * with relatively little I/O usage. + * After this method returns, the current thread will be carried by a pool thread. + */ + public static void resumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.resumeOn(ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @see LockSupport#park() + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(null, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread, Object blocker) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(blocker, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * @see LockSupport#park() + */ + public static void parkAndResumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(null, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOnPool(Object blocker) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(blocker, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(null, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register (see {@link LockSupport#park(Object)}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, Object blocker, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(blocker, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOnPool(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(null, nanos, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOnPool(Object blocker, long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(blocker, nanos, ts.scheduler().poolDispatcher()); + } + } + + EnhancedQueueExecutor blockingPool() { + return blockingPool; + } + + List eventLoopThreads() { + return eventLoopThreads; + } + + ThreadContainer container() { + return container; + } + + Dispatcher poolDispatcher() { + return poolDispatcher; + } + + static final class Container extends ThreadContainer { + static final boolean DEBUG = false; + final Set threads = ConcurrentHashMap.newKeySet(); + + private Container() { + super(false); + } + + public void onStart(final Thread thread) { + // todo: track shutdown + if (DEBUG) threads.add(thread); + } + + public void onExit(final Thread thread) { + // todo: track shutdown + if (DEBUG) threads.remove(thread); + } + + public String name() { + return "managed"; + } + + public long threadCount() { + return threads.size(); + } + + protected boolean tryClose() { + return super.tryClose(); + } + + public Thread owner() { + return super.owner(); + } + + public Stream threads() { + return DEBUG ? threads.stream() : Stream.empty(); + } + } + + private class PoolDispatcher extends Dispatcher { + void execute(final UserThreadScheduler continuation) { + blockingPool.execute(continuation); + } + + ScheduledFuture schedule(final Runnable task, final long nanos) { + return blockingPool.schedule(task, nanos, TimeUnit.NANOSECONDS); + } + } +} diff --git a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java new file mode 100644 index 00000000..e8d90931 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java @@ -0,0 +1,157 @@ +package org.jboss.threads.virtual; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * The base class for an individual thread's scheduler. + */ +abstract class ThreadScheduler implements ScheduledExecutorService, Runnable { + private static final VarHandle yieldedHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "yielded", VarHandle.class, ThreadScheduler.class, boolean.class); + /** + * Indicate if this thread parked or yielded, to detect a starvation scenario. + */ + @SuppressWarnings("unused") // yieldedHandle + volatile boolean yielded; + + /** + * The owning scheduler. + */ + private final Scheduler scheduler; + /** + * The virtual thread associated with this task. + */ + private final Thread virtualThread; + + ThreadScheduler(final Scheduler scheduler, final String name, final long idx) { + this.scheduler = scheduler; + this.virtualThread = Access.threadBuilder(this).inheritInheritableThreadLocals(false).name(name, idx).unstarted(this::runThreadBody); + } + + void start() { + Access.startThread(virtualThread, scheduler.container()); + } + + abstract void runThreadBody(); + + /** + * Run the continuation for the current thread. + */ + public void run() { + assert ! Thread.currentThread().isVirtual(); + try { + Access.continuationOf(virtualThread).run(); + } finally { + yieldedHandle.setOpaque(this, true); + } + } + + /** + * {@return {@code true} if this scheduler has yielded since the last invocation of this method, or {@code false} otherwise} + */ + boolean yielded() { + boolean yielded = (boolean) yieldedHandle.getOpaque(this); + if (yielded) { + yieldedHandle.setOpaque(this, false); + } + return yielded; + } + + /** + * {@return the overall scheduler for this thread scheduler} + */ + Scheduler scheduler() { + return scheduler; + } + + /** + * {@return the virtual thread associated with this thread scheduler} + */ + Thread virtualThread() { + return virtualThread; + } + + /** + * Schedule the given command, which may be the continuation for this thread. + * + * @param command the command (must not be {@code null}) + */ + public abstract void execute(Runnable command); + + /** + * Schedule the given command, which is generally an unpark request for some other sleeping thread. + * + * @param command the command (must not be {@code null}) + */ + public abstract ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit); + + // unimplemented methods + + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public void shutdown() { + throw new UnsupportedOperationException(); + } + + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public Future submit(Callable task) { + throw new UnsupportedOperationException(); + } + + public Future submit(Runnable task, T result) { + throw new UnsupportedOperationException(); + } + + public Future submit(Runnable task) { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + public T invokeAny(Collection> tasks) { + throw new UnsupportedOperationException(); + } + + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java new file mode 100644 index 00000000..192e5597 --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java @@ -0,0 +1,125 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * A scheduler for a specific thread. + */ +final class UserThreadScheduler extends ThreadScheduler { + /** + * The system nanos time when this task started waiting. + */ + private long waitingSince; + /** + * The current scheduling executor. + */ + private Dispatcher dispatcher; + /** + * The user task. + */ + private Runnable task; + + UserThreadScheduler(final Scheduler scheduler, final Runnable task, final long idx) { + this(scheduler, task, scheduler.poolDispatcher(), idx); + } + + UserThreadScheduler(final Scheduler scheduler, final Runnable task, final long idx, final EventLoopThread eventLoopThread) { + this(scheduler, task, eventLoopThread.dispatcher(), idx); + } + + private UserThreadScheduler(final Scheduler scheduler, final Runnable task, final Dispatcher dispatcher, final long idx) { + super(scheduler, "User thread", idx); + this.dispatcher = dispatcher; + this.task = task; + } + + void runThreadBody() { + Runnable task = this.task; + // release the reference + this.task = null; + task.run(); + } + + /** + * Run the continuation for the current thread. + */ + public void run() { + try { + super.run(); + } finally { + waitingSince = System.nanoTime(); + } + } + + public void execute(Runnable command) { + if (command == Access.continuationOf(virtualThread())) { + dispatcher.execute(this); + } else { + // we can only execute our continuation + throw new IllegalStateException(); + } + } + + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + // command is going to be VirtualThread::unpark or similar (runnable from carrier thread) + return dispatcher.schedule(command, unit.toNanos(delay)); + } + + /** + * {@return the wait time of this thread in nanos} + * @param current the current time + */ + long waitTime(long current) { + return current - waitingSince; + } + + void resumeOn(final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + Thread.yield(); + } + } + + void parkAndResumeOn(final Object blocker, final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + // clear yielded flag + yielded(); + if (blocker == null) { + LockSupport.park(); + } else { + LockSupport.park(blocker); + } + if (! yielded()) { + // park didn't block, so manually reschedule + Thread.yield(); + } + } else { + if (blocker == null) { + LockSupport.park(); + } else { + LockSupport.park(blocker); + } + } + } + + void parkNanosAndResumeOn(final Object blocker, final long nanos, final Dispatcher dispatcher) { + if (dispatcher != this.dispatcher) { + this.dispatcher = dispatcher; + // todo: we have to yield now so that we don't end up calling `schedule` from the wrong thread + // we can optimize this by examining the dispatcher or by synchronizing the queue somehow + Thread.yield(); + } + if (blocker == null) { + LockSupport.parkNanos(nanos); + } else { + LockSupport.parkNanos(blocker, nanos); + } + } + + long waitingSince() { + return waitingSince; + } +} diff --git a/src/main/java/org/jboss/threads/virtual/package-info.java b/src/main/java/org/jboss/threads/virtual/package-info.java new file mode 100644 index 00000000..fa15017c --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/package-info.java @@ -0,0 +1,7 @@ +/** + * Support for advanced virtual thread scheduling. + */ +@Experimental("Experimental virtual thread support") +package org.jboss.threads.virtual; + +import io.smallrye.common.annotation.Experimental; \ No newline at end of file From adb9c3f3ba3bb497d804f7ba424c97d41a11c4ef Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Wed, 19 Feb 2025 13:06:33 -0600 Subject: [PATCH 2/8] Add the forbidden thread pool --- .../threads/virtual/EventLoopThread.java | 66 +++++++++++++++++++ .../virtual/EventLoopThreadScheduler.java | 3 +- .../java/org/jboss/threads/virtual/Util.java | 17 +++++ 3 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/jboss/threads/virtual/Util.java diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java index 574aa769..3c7149aa 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -4,15 +4,18 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.function.Function; import java.util.function.Predicate; +import io.smallrye.common.annotation.Experimental; import org.jboss.threads.JBossThread; /** @@ -87,6 +90,66 @@ public void execute(Runnable runnable) { elts.scheduler().executeOnEventLoop(this, runnable); } + /** + * {@return a new executor service which pools threads and schedules tasks to this event loop} + * Executed tasks are free to change to another event loop or the shared pool. + * Tasks are always started with an association to this event loop. + */ + @Experimental("Pooled event-loop-bound threads") + public Executor newPool() { + return new Executor() { + static final VarHandle taskHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "task", VarHandle.class, Runner.class, Runnable.class); + class Runner implements Runnable { + + private volatile Thread thread; + @SuppressWarnings("unused") // taskHandle + private volatile Runnable task; + + Runner(final Runnable task) { + this.task = task; + } + + public void run() { + thread = Thread.currentThread(); + for (;;) { + // get and run next task + Runnable task = (Runnable) taskHandle.getAndSet(this, null); + if (task != null) { + try { + task.run(); + } catch (Throwable ignored) {} + Util.clearUnpark(); + // re-add to queue /after/ clearing park permit + q.addFirst(this); + } + // wait for a new task + Scheduler.parkAndResumeOn(EventLoopThread.this); + } + } + } + + private final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque<>(); + + public void execute(final Runnable command) { + Runner runner; + for (;;) { + runner = q.poll(); + if (runner == null) { + // don't wait around, just start a new thread immediately + runner = new Runner(command); + EventLoopThread.this.execute(runner); + return; + } + // try to set the task + if (taskHandle.compareAndSet(runner, null, command)) { + LockSupport.unpark(runner.thread); + return; + } + } + } + }; + } + /** * {@return the owner of this event loop thread} */ @@ -121,6 +184,9 @@ public void interrupt() { /** * Run the carrier side of the event loop. + * This should only be called by {@code Thread.start()}. + * + * @throws IllegalThreadStateException if called inappropriately */ public void run() { if (Thread.currentThread() != this || elts.virtualThread().isAlive()) { diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java index b1d90de2..f6761177 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java @@ -38,8 +38,7 @@ void runThreadBody() { long waitTime; for (;;) { // clear the unpark permit - LockSupport.unpark(Thread.currentThread()); - LockSupport.park(); + Util.clearUnpark(); // clear interrupt status Thread.interrupted(); // call the event loop diff --git a/src/main/java/org/jboss/threads/virtual/Util.java b/src/main/java/org/jboss/threads/virtual/Util.java new file mode 100644 index 00000000..093d31ce --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/Util.java @@ -0,0 +1,17 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +/** + * Utilities. + */ +final class Util { + private Util() {} + + static void clearUnpark() { + // change unpark permit from (0 or 1) to (1) + LockSupport.unpark(Thread.currentThread()); + // change unpark permit from (1) to (0) + LockSupport.park(); + } +} From 9888d1f2b11c4c7b7a39a5e1fd0390ce9460ae9f Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Wed, 19 Feb 2025 14:03:56 -0600 Subject: [PATCH 3/8] Remove unused class --- .../java/org/jboss/threads/virtual/Mode.java | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 src/main/java/org/jboss/threads/virtual/Mode.java diff --git a/src/main/java/org/jboss/threads/virtual/Mode.java b/src/main/java/org/jboss/threads/virtual/Mode.java deleted file mode 100644 index 4a5ce672..00000000 --- a/src/main/java/org/jboss/threads/virtual/Mode.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.jboss.threads.virtual; - -/** - * The virtual thread scheduling mode. - */ -enum Mode { - /** - * Schedule on to the I/O thread. - */ - IO, - /** - * Schedule on to the worker thread. - */ - WORKER, - ; -} From 41085e1a597a60cd555eb7793df6fd2f3b7e5de7 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Thu, 20 Feb 2025 14:41:00 -0600 Subject: [PATCH 4/8] Fixes for scheduling behaviors of Java 25+ --- .../org/jboss/threads/virtual/Access.java | 18 ++-- .../org/jboss/threads/virtual/EventLoop.java | 10 +- .../threads/virtual/EventLoopThread.java | 95 +++++++------------ .../virtual/EventLoopThreadScheduler.java | 29 +++--- .../org/jboss/threads/virtual/Scheduler.java | 23 +++-- .../threads/virtual/ThreadScheduler.java | 32 +++++++ .../threads/virtual/UserThreadScheduler.java | 27 ------ 7 files changed, 104 insertions(+), 130 deletions(-) diff --git a/src/main/java/org/jboss/threads/virtual/Access.java b/src/main/java/org/jboss/threads/virtual/Access.java index 158bed30..99ba9a4d 100644 --- a/src/main/java/org/jboss/threads/virtual/Access.java +++ b/src/main/java/org/jboss/threads/virtual/Access.java @@ -30,10 +30,10 @@ final class Access { ct = thr.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class)); Class vtbClass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder", false, null); try { - vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class)); - } catch (NoSuchMethodError ignored) { - // patched JDK vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, ScheduledExecutorService.class)); + } catch (NoSuchMethodException | NoSuchMethodError ignored) { + // unpatched JDK + vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class)); } // create efficient transformer vtf = vtf.asType(MethodType.methodType(Thread.Builder.OfVirtual.class, ThreadScheduler.class)); @@ -42,7 +42,13 @@ final class Access { tswc = thr.findVirtual(Thread.class, "start", MethodType.methodType(void.class, ThreadContainer.class)); Class vtc = thr.findClass("java.lang.VirtualThread"); MethodHandles.Lookup vthr = MethodHandles.privateLookupIn(vtc, MethodHandles.lookup()); - sg = vthr.findGetter(vtc, "scheduler", Executor.class).asType(MethodType.methodType(Executor.class, Thread.class)); + try { + sg = vthr.findGetter(vtc, "scheduler", ScheduledExecutorService.class); + } catch (NoSuchFieldException | NoSuchFieldError ignored) { + // unpatched JDK + sg = vthr.findGetter(vtc, "scheduler", Executor.class); + } + sg = sg.asType(MethodType.methodType(Executor.class, Thread.class)); cg = vthr.findGetter(vtc, "runContinuation", Runnable.class).asType(MethodType.methodType(Runnable.class, Thread.class)); } catch (Throwable e) { // no good @@ -85,9 +91,9 @@ static void startThread(Thread thread, ThreadContainer threadContainer) { } } - static ThreadScheduler schedulerOf(Thread thread) { + static Executor schedulerOf(Thread thread) { try { - return (ThreadScheduler) (Executor) schedulerGetter.invokeExact(thread); + return (Executor) schedulerGetter.invokeExact(thread); } catch (RuntimeException | Error e) { throw e; } catch (Throwable e) { diff --git a/src/main/java/org/jboss/threads/virtual/EventLoop.java b/src/main/java/org/jboss/threads/virtual/EventLoop.java index 1e80c07d..4c30fab4 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoop.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoop.java @@ -1,7 +1,5 @@ package org.jboss.threads.virtual; -import java.util.concurrent.locks.LockSupport; - import io.smallrye.common.annotation.Experimental; /** @@ -28,12 +26,8 @@ protected EventLoop() {} * This method will be called in a loop (the event loop, in fact). * After each invocation of this method, up to one other waiting thread will be continued. * Since this generally would lead to busy-looping, - * the implementation of this method should {@linkplain LockSupport#parkNanos(long) park} for some amount of time before returning. - * While the event loop method is parked, - * other threads will be allowed to run. - * If the set of ready threads is exhausted before that time elapses, - * the event loop thread will automatically be unparked, - * allowing the loop to be re-entered from the top to wait for ready events. + * the implementation of this method should + * {@linkplain Scheduler#yieldNanos(long) yield for some amount of time} before returning to allow other threads to run. *

* Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes, * so it is not recommended. diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java index 3c7149aa..97f50ad1 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -24,10 +24,6 @@ * use {@link #virtualThread()}. */ public final class EventLoopThread extends JBossThread implements Executor { - private static final VarHandle waitTimeHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "waitTime", VarHandle.class, EventLoopThread.class, long.class); - - // TODO: we could track the currently-mounted thread fairly easily, maybe along with a timestamp - /** * The virtual thread which runs the event handler. */ @@ -50,25 +46,19 @@ public final class EventLoopThread extends JBossThread implements Executor { * This is a snapshot of the current time, taken immediately before draining the delay queue. */ private long currentNanos; - /** - * The wait time for the virtual thread side of the event loop. - * This value is handed back and forth between the I/O carrier thread and the event loop virtual thread. - */ - @SuppressWarnings("unused") // waitTimeHandle - private volatile long waitTime = -1; /** * The shared/slow task queue, which allows tasks to be enqueued from outside of this thread. */ - private final LinkedBlockingQueue sq = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue sq = new LinkedBlockingQueue<>(); /** * The task queue. * The queue is ordered by the amount of time that each entry (thread) has been waiting to run. */ - private final PriorityQueue q = new PriorityQueue<>(this::compare); + private final PriorityQueue q = new PriorityQueue<>(this::compare); /** * The bulk remover for transferring {@code sq} to {@code q}. */ - private final Predicate bulkRemover = q::add; + private final Predicate bulkRemover = q::add; /** * The delay queue for timed sleep (patched JDKs only). */ @@ -214,36 +204,31 @@ public void run() { waitTime = Math.max(1L, dt.deadline - currentNanos); } } - UserThreadScheduler removed = q.poll(); + ThreadScheduler removed = q.poll(); if (removed == null) { - waitTimeHandle.setOpaque(this, waitTime); - // mark event handler ready - elts.makeReady(); - // call event handler (possibly early) - elts.run(); + // all threads are parked, even the event loop, so just wait around + LockSupport.park(); } else { + if (removed == elts) { + // configure the wait time + elts.setWaitTime(waitTime); + } // update for next q operation without hitting nanoTime over and over - cmpNanos = removed.waitingSince(); + cmpNanos = removed.waitingSinceTime(); removed.run(); - // now, see if we reenter the event loop right away or not - if (elts.ready()) { - waitTimeHandle.setOpaque(this, 0L); - // call event handler - elts.run(); - } - // otherwise, continue processing tasks } } } - void enqueueFromOutside(final UserThreadScheduler command) { - sq.add(command); - eventLoop().wakeup(); - } - - void enqueueLocal(final UserThreadScheduler command) { - assert Thread.currentThread() == this || Access.currentCarrier() == this; - q.add(command); + void enqueue(final ThreadScheduler continuation) { + Thread ct = Thread.currentThread(); + if (ct == this || Access.currentCarrier() == this) { + q.add(continuation); + } else { + sq.add(continuation); + LockSupport.unpark(this); + eventLoop().wakeup(); + } } /** @@ -254,31 +239,24 @@ void enqueueLocal(final UserThreadScheduler command) { * @param o2 the second thread scheduler (must not be {@code null}) * @return the comparison result */ - private int compare(UserThreadScheduler o1, UserThreadScheduler o2) { + private int compare(ThreadScheduler o1, ThreadScheduler o2) { long cmpNanos = this.cmpNanos; - return Long.compare(o2.waitTime(cmpNanos), o1.waitTime(cmpNanos)); + return Long.compare(o2.waitingSince(cmpNanos), o1.waitingSince(cmpNanos)); } ScheduledFuture schedule(final Runnable command, final long nanos) { + // it is expected that this will only be called locally Thread ct = Thread.currentThread(); - if (ct == this || ct.isVirtual() && Access.currentCarrier() == this) { - DelayedTask task = new DelayedTask<>(command, System.nanoTime() + nanos); - dq.add(task); - return task; - } else { - // not expected - throw new IllegalStateException(); - } + assert ct == EventLoopThread.this; + DelayedTask dt = new DelayedTask<>(command, System.nanoTime() + nanos); + dq.add(dt); + return dt; } Dispatcher dispatcher() { return dispatcher; } - long waitTime() { - return (long) waitTimeHandle.getOpaque(this); - } - private final class DelayedTask implements ScheduledFuture, Runnable { private final Runnable task; private final long deadline; @@ -303,7 +281,11 @@ public int compareTo(final Delayed o) { } public boolean cancel(final boolean mayInterruptIfRunning) { - // unsupported + Thread ct = Thread.currentThread(); + if (ct == EventLoopThread.this || ct.isVirtual() && Access.currentCarrier() == EventLoopThread.this) { + return dq.remove(this); + } + // else unsupported return false; } @@ -332,20 +314,11 @@ public void run() { private class EventLoopDispatcher extends Dispatcher { void execute(final UserThreadScheduler continuation) { - Thread ct = Thread.currentThread(); - if (ct == EventLoopThread.this || Access.currentCarrier() == EventLoopThread.this) { - enqueueLocal(continuation); - } else { - enqueueFromOutside(continuation); - } + enqueue(continuation); } ScheduledFuture schedule(final Runnable task, final long nanos) { - // it is expected that this will only be called locally - assert Thread.currentThread() == EventLoopThread.this; - DelayedTask dt = new DelayedTask<>(task, System.nanoTime() + nanos); - dq.add(dt); - return dt; + return EventLoopThread.this.schedule(task, nanos); } } } diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java index f6761177..fa74940b 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java @@ -5,32 +5,26 @@ import java.lang.invoke.VarHandle; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; /** * The thread scheduler for an event loop thread. */ final class EventLoopThreadScheduler extends ThreadScheduler { - private static final VarHandle readyHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "ready", VarHandle.class, EventLoopThreadScheduler.class, boolean.class); + private static final VarHandle waitTimeHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "waitTime", VarHandle.class, EventLoopThreadScheduler.class, long.class); private final EventLoopThread eventLoopThread; - @SuppressWarnings("unused") // readyHandle - private boolean ready; + /** + * The wait time for the virtual thread side of the event loop. + * This value is handed back and forth between the I/O carrier thread and the event loop virtual thread. + */ + @SuppressWarnings("unused") // waitTimeHandle + private volatile long waitTime = -1; EventLoopThreadScheduler(final Scheduler scheduler, final EventLoopThread eventLoopThread, final long idx) { super(scheduler, "Event loop", idx); this.eventLoopThread = eventLoopThread; } - boolean ready() { - return (boolean) readyHandle.getOpaque(this); - } - - void makeReady() { - readyHandle.setOpaque(this, true); - LockSupport.unpark(virtualThread()); - } - void runThreadBody() { // this runs on the *virtual* event loop thread EventLoopThread eventLoopThread = this.eventLoopThread; @@ -42,7 +36,7 @@ void runThreadBody() { // clear interrupt status Thread.interrupted(); // call the event loop - waitTime = eventLoopThread.waitTime(); + waitTime = (long) waitTimeHandle.getOpaque(this); try { eventLoop.unparkAny(waitTime); } catch (Throwable ignored) { @@ -56,9 +50,8 @@ void runThreadBody() { } } - public void run() { - readyHandle.setOpaque(this, false); - super.run(); + void setWaitTime(long nanos) { + waitTimeHandle.setOpaque(this, nanos); } void start() { @@ -66,7 +59,7 @@ void start() { } public void execute(final Runnable command) { - readyHandle.setOpaque(this, true); + eventLoopThread.enqueue(this); } public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { diff --git a/src/main/java/org/jboss/threads/virtual/Scheduler.java b/src/main/java/org/jboss/threads/virtual/Scheduler.java index de9aebfd..7555e3f4 100644 --- a/src/main/java/org/jboss/threads/virtual/Scheduler.java +++ b/src/main/java/org/jboss/threads/virtual/Scheduler.java @@ -1,9 +1,7 @@ package org.jboss.threads.virtual; -import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -24,7 +22,6 @@ @Experimental("Experimental virtual thread support") public final class Scheduler implements Executor { private final EnhancedQueueExecutor blockingPool; - private final List eventLoopThreads = new CopyOnWriteArrayList<>(); private final Container container = new Container(); private final AtomicInteger eventLoopIdx = new AtomicInteger(1); private final AtomicLong threadIdx = new AtomicLong(1); @@ -233,12 +230,18 @@ public static void parkNanosAndResumeOnPool(Object blocker, long nanos) { } } - EnhancedQueueExecutor blockingPool() { - return blockingPool; - } - - List eventLoopThreads() { - return eventLoopThreads; + /** + * Yield execution to any task is already waiting or will start waiting within the next {@code nanos} nanoseconds. + * If no tasks remain within the given criteria, the current thread will resume. + * + * @param nanos the number of nanoseconds to attempt to yield for + */ + public static void yieldNanos(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + ts.delayBy(Math.max(0, nanos)); + } + Thread.yield(); } ThreadContainer container() { @@ -254,7 +257,7 @@ static final class Container extends ThreadContainer { final Set threads = ConcurrentHashMap.newKeySet(); private Container() { - super(false); + super(true); } public void onStart(final Thread thread) { diff --git a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java index e8d90931..aafd9206 100644 --- a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java @@ -16,8 +16,21 @@ */ abstract class ThreadScheduler implements ScheduledExecutorService, Runnable { private static final VarHandle yieldedHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "yielded", VarHandle.class, ThreadScheduler.class, boolean.class); + private static final VarHandle delayHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "delay", VarHandle.class, ThreadScheduler.class, long.class); + /** + * The system nanos time when this task started waiting. + * Accessed only by carrier threads. + */ + private long waitingSinceTime; + /** + * The nanosecond delay time for scheduling. + * Accessed by carrier threads and virtual threads. + */ + @SuppressWarnings("unused") // delayHandle + private long delay; /** * Indicate if this thread parked or yielded, to detect a starvation scenario. + * Accessed by carrier threads and virtual threads. */ @SuppressWarnings("unused") // yieldedHandle volatile boolean yielded; @@ -42,14 +55,33 @@ void start() { abstract void runThreadBody(); + void delayBy(long nanos) { + delayHandle.setOpaque(this, nanos); + } + + long waitingSinceTime() { + return waitingSinceTime; + } + + /** + * {@return the wait time of this thread in nanos} + * @param current the current time + */ + long waitingSince(long current) { + return current - waitingSinceTime - (long) delayHandle.getOpaque(this); + } + /** * Run the continuation for the current thread. */ public void run() { assert ! Thread.currentThread().isVirtual(); + // reset delay + delayBy(0); try { Access.continuationOf(virtualThread).run(); } finally { + waitingSinceTime = System.nanoTime(); yieldedHandle.setOpaque(this, true); } } diff --git a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java index 192e5597..0378873c 100644 --- a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java @@ -8,10 +8,6 @@ * A scheduler for a specific thread. */ final class UserThreadScheduler extends ThreadScheduler { - /** - * The system nanos time when this task started waiting. - */ - private long waitingSince; /** * The current scheduling executor. */ @@ -42,17 +38,6 @@ void runThreadBody() { task.run(); } - /** - * Run the continuation for the current thread. - */ - public void run() { - try { - super.run(); - } finally { - waitingSince = System.nanoTime(); - } - } - public void execute(Runnable command) { if (command == Access.continuationOf(virtualThread())) { dispatcher.execute(this); @@ -67,14 +52,6 @@ public ScheduledFuture schedule(final Runnable command, final long delay, fin return dispatcher.schedule(command, unit.toNanos(delay)); } - /** - * {@return the wait time of this thread in nanos} - * @param current the current time - */ - long waitTime(long current) { - return current - waitingSince; - } - void resumeOn(final Dispatcher dispatcher) { if (dispatcher != this.dispatcher) { this.dispatcher = dispatcher; @@ -118,8 +95,4 @@ void parkNanosAndResumeOn(final Object blocker, final long nanos, final Dispatch LockSupport.parkNanos(blocker, nanos); } } - - long waitingSince() { - return waitingSince; - } } From ea072f0bd1b3d40823688bd193e013ee0af1b8eb Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Thu, 20 Feb 2025 16:41:27 -0600 Subject: [PATCH 5/8] Add priority mechanism and clean ups --- .../threads/virtual/EventLoopThread.java | 8 +-- .../virtual/EventLoopThreadScheduler.java | 8 ++- .../org/jboss/threads/virtual/Scheduler.java | 54 ++++++++++++++++--- .../threads/virtual/ThreadScheduler.java | 52 +++++++++++++++++- .../threads/virtual/UserThreadScheduler.java | 2 +- 5 files changed, 107 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java index 97f50ad1..657890ff 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -12,7 +12,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; -import java.util.function.Function; import java.util.function.Predicate; import io.smallrye.common.annotation.Experimental; @@ -63,12 +62,15 @@ public final class EventLoopThread extends JBossThread implements Executor { * The delay queue for timed sleep (patched JDKs only). */ private final DelayQueue> dq = new DelayQueue<>(); + /** + * The event loop's dispatcher. + */ private final Dispatcher dispatcher = new EventLoopDispatcher(); - EventLoopThread(final Scheduler scheduler, final int idx, final Function eventLoopFactory) { + EventLoopThread(final Scheduler scheduler, final int idx, final EventLoop eventLoop) { super(() -> {}, "Event loop carrier thread " + idx); elts = new EventLoopThreadScheduler(scheduler, this, idx); - this.eventLoop = eventLoopFactory.apply(this); + this.eventLoop = eventLoop; } /** diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java index fa74940b..12d1e216 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThreadScheduler.java @@ -21,8 +21,10 @@ final class EventLoopThreadScheduler extends ThreadScheduler { private volatile long waitTime = -1; EventLoopThreadScheduler(final Scheduler scheduler, final EventLoopThread eventLoopThread, final long idx) { - super(scheduler, "Event loop", idx); + super(scheduler, "event-loop-", idx); this.eventLoopThread = eventLoopThread; + // set event loop thread to almost-maximum priority + setPriority(Thread.MAX_PRIORITY - 1); } void runThreadBody() { @@ -54,10 +56,6 @@ void setWaitTime(long nanos) { waitTimeHandle.setOpaque(this, nanos); } - void start() { - super.start(); - } - public void execute(final Runnable command) { eventLoopThread.enqueue(this); } diff --git a/src/main/java/org/jboss/threads/virtual/Scheduler.java b/src/main/java/org/jboss/threads/virtual/Scheduler.java index 7555e3f4..c2a238a6 100644 --- a/src/main/java/org/jboss/threads/virtual/Scheduler.java +++ b/src/main/java/org/jboss/threads/virtual/Scheduler.java @@ -8,7 +8,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; -import java.util.function.Function; import java.util.stream.Stream; import io.smallrye.common.annotation.Experimental; @@ -37,15 +36,15 @@ public final class Scheduler implements Executor { } /** - * Construct and start a new event loop thread. + * Construct and start a new event loop thread for this scheduler. * - * @param eventLoopFactory the event loop factory (must not be {@code null}) + * @param eventLoop the event loop to use (must not be {@code null}) * @return the new event loop thread (not {@code null}) * @throws NullPointerException if the factory returned a {@code null} event loop */ - public EventLoopThread newEventLoopThread(Function eventLoopFactory) { - Assert.checkNotNullParam("eventLoopFactory", eventLoopFactory); - EventLoopThread eventLoopThread = new EventLoopThread(this, eventLoopIdx.getAndIncrement(), eventLoopFactory); + public EventLoopThread newEventLoopThread(EventLoop eventLoop) { + Assert.checkNotNullParam("eventLoop", eventLoop); + EventLoopThread eventLoopThread = new EventLoopThread(this, eventLoopIdx.getAndIncrement(), eventLoop); eventLoopThread.start(); return eventLoopThread; } @@ -74,6 +73,17 @@ void executeOnEventLoop(final EventLoopThread eventLoopThread, final Runnable co new UserThreadScheduler(this, command, threadIdx.getAndIncrement(), eventLoopThread).start(); } + /** + * {@return the current event loop carrier thread, or {@code null} if the current thread is not currently carried by an event loop thread} + */ + public static EventLoopThread currentEventLoopThread() { + if (Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt) { + return elt; + } else { + return null; + } + } + /** * Indicate that the current thread is going to be performing I/O-intensive operations * with relatively little CPU or native usage. @@ -244,6 +254,38 @@ public static void yieldNanos(long nanos) { Thread.yield(); } + /** + * {@return the priority of the current thread or virtual thread} + */ + public static int currentPriority() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + return ts.priority(); + } else { + return thread.getPriority(); + } + } + + /** + * Change the priority of the current virtual thread, if possible. + * + * @param newPriority the new virtual thread priority + */ + public static void changePriority(int newPriority) { + newPriority = Math.min(Math.max(newPriority, Thread.MIN_PRIORITY), Thread.MAX_PRIORITY); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + int old = ts.priority(); + if (newPriority != old) { + // apply new priority + ts.setPriority(newPriority); + Thread.yield(); + } + } else { + thread.setPriority(newPriority); + } + } + ThreadContainer container() { return container; } diff --git a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java index aafd9206..709eb815 100644 --- a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java @@ -17,11 +17,18 @@ abstract class ThreadScheduler implements ScheduledExecutorService, Runnable { private static final VarHandle yieldedHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "yielded", VarHandle.class, ThreadScheduler.class, boolean.class); private static final VarHandle delayHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "delay", VarHandle.class, ThreadScheduler.class, long.class); + private static final VarHandle priorityHandle = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), "priority", VarHandle.class, ThreadScheduler.class, int.class); /** * The system nanos time when this task started waiting. * Accessed only by carrier threads. */ private long waitingSinceTime; + /** + * The current thread priority, between {@link Thread#MIN_PRIORITY} (lowest) or {@link Thread#MAX_PRIORITY}. + * Accessed by carrier threads and virtual threads. + */ + @SuppressWarnings("unused") // priorityHandle + private int priority = Thread.NORM_PRIORITY; /** * The nanosecond delay time for scheduling. * Accessed by carrier threads and virtual threads. @@ -63,12 +70,52 @@ long waitingSinceTime() { return waitingSinceTime; } + int priority() { + return (int) priorityHandle.getOpaque(this); + } + + void setPriority(final int priority) { + priorityHandle.setOpaque(this, priority); + } + /** - * {@return the wait time of this thread in nanos} + * {@return the number of nanoseconds that this thread has been waiting for} + * The higher the waiting-since time, the higher priority a thread will have. + * * @param current the current time */ long waitingSince(long current) { - return current - waitingSinceTime - (long) delayHandle.getOpaque(this); + long delay = (long) delayHandle.getOpaque(this); + // delay is always 0 or positive + long nanos = Math.max(0, current - waitingSinceTime) - delay; + // nanos may be negative now + int priority = priority(); + if (priority < Thread.NORM_PRIORITY) { + // lower priority, so make nanos less positive or more negative + if (nanos < 0) { + int shift = priority - Thread.NORM_PRIORITY; + if (shift > Long.numberOfLeadingZeros(- nanos)) { + nanos = Long.MIN_VALUE; + } else { + nanos <<= shift; + } + } else { + nanos >>= Thread.NORM_PRIORITY - priority; + } + } else if (priority > Thread.NORM_PRIORITY) { + // higher priority, so make nanos more positive or less negative + if (nanos < 0) { + nanos >>= Thread.NORM_PRIORITY - priority; + } else { + int shift = priority - Thread.NORM_PRIORITY; + if (shift > Long.numberOfLeadingZeros(nanos)) { + nanos = Long.MAX_VALUE; + } else { + nanos <<= shift; + } + } + } + return nanos; } /** @@ -78,6 +125,7 @@ public void run() { assert ! Thread.currentThread().isVirtual(); // reset delay delayBy(0); + // todo: we could change the carrier thread priority when running on the pool, but it might not pay off try { Access.continuationOf(virtualThread).run(); } finally { diff --git a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java index 0378873c..73778530 100644 --- a/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/UserThreadScheduler.java @@ -26,7 +26,7 @@ final class UserThreadScheduler extends ThreadScheduler { } private UserThreadScheduler(final Scheduler scheduler, final Runnable task, final Dispatcher dispatcher, final long idx) { - super(scheduler, "User thread", idx); + super(scheduler, "user-", idx); this.dispatcher = dispatcher; this.task = task; } From 07b5d8c6079f033b72079969927ce2eb4a7635eb Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Mon, 24 Feb 2025 07:27:56 -0600 Subject: [PATCH 6/8] Fix mathematical error in priority calculation --- .../threads/virtual/EventLoopThread.java | 16 ++++++++- .../threads/virtual/ThreadScheduler.java | 33 ++----------------- 2 files changed, 18 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java index 657890ff..b77e2d89 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -23,6 +23,10 @@ * use {@link #virtualThread()}. */ public final class EventLoopThread extends JBossThread implements Executor { + /** + * The number of nanoseconds to offset the wait time by, per priority point. + */ + private static final long PRIORITY_BIAS = 50_000L; /** * The virtual thread which runs the event handler. */ @@ -243,7 +247,17 @@ void enqueue(final ThreadScheduler continuation) { */ private int compare(ThreadScheduler o1, ThreadScheduler o2) { long cmpNanos = this.cmpNanos; - return Long.compare(o2.waitingSince(cmpNanos), o1.waitingSince(cmpNanos)); + + // with priority, we have a potentially greater-than-64-bit number + long w1 = o1.waitingSince(cmpNanos); + int s1 = Thread.NORM_PRIORITY - o1.priority(); + w1 += s1 * PRIORITY_BIAS; + + long w2 = o2.waitingSince(cmpNanos); + int s2 = Thread.NORM_PRIORITY - o2.priority(); + w2 += s2 * PRIORITY_BIAS; + + return Long.compare(w2, w1); } ScheduledFuture schedule(final Runnable command, final long nanos) { diff --git a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java index 709eb815..0af0138b 100644 --- a/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java +++ b/src/main/java/org/jboss/threads/virtual/ThreadScheduler.java @@ -81,41 +81,14 @@ void setPriority(final int priority) { /** * {@return the number of nanoseconds that this thread has been waiting for} * The higher the waiting-since time, the higher priority a thread will have. + * This value may be negative if the wait time includes a delay. * * @param current the current time */ long waitingSince(long current) { long delay = (long) delayHandle.getOpaque(this); - // delay is always 0 or positive - long nanos = Math.max(0, current - waitingSinceTime) - delay; - // nanos may be negative now - int priority = priority(); - if (priority < Thread.NORM_PRIORITY) { - // lower priority, so make nanos less positive or more negative - if (nanos < 0) { - int shift = priority - Thread.NORM_PRIORITY; - if (shift > Long.numberOfLeadingZeros(- nanos)) { - nanos = Long.MIN_VALUE; - } else { - nanos <<= shift; - } - } else { - nanos >>= Thread.NORM_PRIORITY - priority; - } - } else if (priority > Thread.NORM_PRIORITY) { - // higher priority, so make nanos more positive or less negative - if (nanos < 0) { - nanos >>= Thread.NORM_PRIORITY - priority; - } else { - int shift = priority - Thread.NORM_PRIORITY; - if (shift > Long.numberOfLeadingZeros(nanos)) { - nanos = Long.MAX_VALUE; - } else { - nanos <<= shift; - } - } - } - return nanos; + // delay is always 0 or positive, so result may be negative + return Math.max(0, current - waitingSinceTime) - delay; } /** From 320553ab33af326761e1fb201458b31aee68f022 Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Mon, 24 Feb 2025 07:57:17 -0600 Subject: [PATCH 7/8] Move user interface methods to their own class --- .../org/jboss/threads/virtual/EventLoop.java | 2 +- .../threads/virtual/EventLoopThread.java | 2 +- .../org/jboss/threads/virtual/Scheduler.java | 214 ----------------- .../jboss/threads/virtual/VirtualThreads.java | 225 ++++++++++++++++++ 4 files changed, 227 insertions(+), 216 deletions(-) create mode 100644 src/main/java/org/jboss/threads/virtual/VirtualThreads.java diff --git a/src/main/java/org/jboss/threads/virtual/EventLoop.java b/src/main/java/org/jboss/threads/virtual/EventLoop.java index 4c30fab4..667d7012 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoop.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoop.java @@ -27,7 +27,7 @@ protected EventLoop() {} * After each invocation of this method, up to one other waiting thread will be continued. * Since this generally would lead to busy-looping, * the implementation of this method should - * {@linkplain Scheduler#yieldNanos(long) yield for some amount of time} before returning to allow other threads to run. + * {@linkplain VirtualThreads#yieldNanos(long) yield for some amount of time} before returning to allow other threads to run. *

* Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes, * so it is not recommended. diff --git a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java index b77e2d89..f0012847 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoopThread.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoopThread.java @@ -119,7 +119,7 @@ public void run() { q.addFirst(this); } // wait for a new task - Scheduler.parkAndResumeOn(EventLoopThread.this); + VirtualThreads.parkAndResumeOn(EventLoopThread.this); } } } diff --git a/src/main/java/org/jboss/threads/virtual/Scheduler.java b/src/main/java/org/jboss/threads/virtual/Scheduler.java index c2a238a6..f5a9c84e 100644 --- a/src/main/java/org/jboss/threads/virtual/Scheduler.java +++ b/src/main/java/org/jboss/threads/virtual/Scheduler.java @@ -7,7 +7,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.LockSupport; import java.util.stream.Stream; import io.smallrye.common.annotation.Experimental; @@ -73,219 +72,6 @@ void executeOnEventLoop(final EventLoopThread eventLoopThread, final Runnable co new UserThreadScheduler(this, command, threadIdx.getAndIncrement(), eventLoopThread).start(); } - /** - * {@return the current event loop carrier thread, or {@code null} if the current thread is not currently carried by an event loop thread} - */ - public static EventLoopThread currentEventLoopThread() { - if (Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt) { - return elt; - } else { - return null; - } - } - - /** - * Indicate that the current thread is going to be performing I/O-intensive operations - * with relatively little CPU or native usage. - * After this method returns, the current thread will be carried by the given event loop thread. - * - * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) - */ - public static void resumeOn(EventLoopThread eventLoopThread) { - Assert.checkNotNullParam("eventLoopThread", eventLoopThread); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { - ts.resumeOn(eventLoopThread.dispatcher()); - } else { - throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); - } - } - - /** - * Indicate that the current thread is going to be performing native or CPU-intensive operations - * with relatively little I/O usage. - * After this method returns, the current thread will be carried by a pool thread. - */ - public static void resumeOnPool() { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { - ts.resumeOn(ts.scheduler().poolDispatcher()); - } - } - - /** - * Park and resume on the given event loop thread. - * After this method returns, the current thread will be carried by the given event loop thread. - * - * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) - * @see LockSupport#park() - */ - public static void parkAndResumeOn(EventLoopThread eventLoopThread) { - Assert.checkNotNullParam("eventLoopThread", eventLoopThread); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { - ts.parkAndResumeOn(null, eventLoopThread.dispatcher()); - } else { - throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); - } - } - - /** - * Park and resume on the given event loop thread. - * After this method returns, the current thread will be carried by the given event loop thread. - * - * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) - * @param blocker the blocker object to register - * @see LockSupport#park(Object) - */ - public static void parkAndResumeOn(EventLoopThread eventLoopThread, Object blocker) { - Assert.checkNotNullParam("eventLoopThread", eventLoopThread); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { - ts.parkAndResumeOn(blocker, eventLoopThread.dispatcher()); - } else { - throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); - } - } - - /** - * Park and resume on the blocking pool. - * After this method returns, the current thread will be carried by a pool thread. - * @see LockSupport#park() - */ - public static void parkAndResumeOnPool() { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { - ts.parkAndResumeOn(null, ts.scheduler().poolDispatcher()); - } - } - - /** - * Park and resume on the blocking pool. - * After this method returns, the current thread will be carried by a pool thread. - * - * @param blocker the blocker object to register - * @see LockSupport#park(Object) - */ - public static void parkAndResumeOnPool(Object blocker) { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { - ts.parkAndResumeOn(blocker, ts.scheduler().poolDispatcher()); - } - } - - /** - * Park and resume on the given event loop thread. - * After this method returns, the current thread will be carried by the given event loop thread. - * - * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) - * @param nanos the number of nanoseconds to park for - * @see LockSupport#parkNanos(long) - */ - public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, long nanos) { - Assert.checkNotNullParam("eventLoopThread", eventLoopThread); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { - ts.parkNanosAndResumeOn(null, nanos, eventLoopThread.dispatcher()); - } else { - throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); - } - } - - /** - * Park and resume on the given event loop thread. - * After this method returns, the current thread will be carried by the given event loop thread. - * - * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) - * @param blocker the blocker object to register (see {@link LockSupport#park(Object)}) - * @param nanos the number of nanoseconds to park for - * @see LockSupport#parkNanos(Object, long) - */ - public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, Object blocker, long nanos) { - Assert.checkNotNullParam("eventLoopThread", eventLoopThread); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { - ts.parkNanosAndResumeOn(blocker, nanos, eventLoopThread.dispatcher()); - } else { - throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); - } - } - - /** - * Park and resume on the blocking pool. - * After this method returns, the current thread will be carried by a pool thread. - * - * @param nanos the number of nanoseconds to park for - * @see LockSupport#parkNanos(long) - */ - public static void parkNanosAndResumeOnPool(long nanos) { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { - ts.parkNanosAndResumeOn(null, nanos, ts.scheduler().poolDispatcher()); - } - } - - /** - * Park and resume on the blocking pool. - * After this method returns, the current thread will be carried by a pool thread. - * - * @param blocker the blocker object to register - * @param nanos the number of nanoseconds to park for - * @see LockSupport#parkNanos(Object, long) - */ - public static void parkNanosAndResumeOnPool(Object blocker, long nanos) { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { - ts.parkNanosAndResumeOn(blocker, nanos, ts.scheduler().poolDispatcher()); - } - } - - /** - * Yield execution to any task is already waiting or will start waiting within the next {@code nanos} nanoseconds. - * If no tasks remain within the given criteria, the current thread will resume. - * - * @param nanos the number of nanoseconds to attempt to yield for - */ - public static void yieldNanos(long nanos) { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { - ts.delayBy(Math.max(0, nanos)); - } - Thread.yield(); - } - - /** - * {@return the priority of the current thread or virtual thread} - */ - public static int currentPriority() { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { - return ts.priority(); - } else { - return thread.getPriority(); - } - } - - /** - * Change the priority of the current virtual thread, if possible. - * - * @param newPriority the new virtual thread priority - */ - public static void changePriority(int newPriority) { - newPriority = Math.min(Math.max(newPriority, Thread.MIN_PRIORITY), Thread.MAX_PRIORITY); - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { - int old = ts.priority(); - if (newPriority != old) { - // apply new priority - ts.setPriority(newPriority); - Thread.yield(); - } - } else { - thread.setPriority(newPriority); - } - } - ThreadContainer container() { return container; } diff --git a/src/main/java/org/jboss/threads/virtual/VirtualThreads.java b/src/main/java/org/jboss/threads/virtual/VirtualThreads.java new file mode 100644 index 00000000..34d12e3d --- /dev/null +++ b/src/main/java/org/jboss/threads/virtual/VirtualThreads.java @@ -0,0 +1,225 @@ +package org.jboss.threads.virtual; + +import java.util.concurrent.locks.LockSupport; + +import io.smallrye.common.constraint.Assert; + +/** + * A class with utility methods for manipulating the current virtual thread. + */ +public final class VirtualThreads { + private VirtualThreads() {} + + /** + * {@return the current event loop carrier thread, or {@code null} if the current thread is not currently carried by an event loop thread} + */ + public static EventLoopThread currentEventLoopThread() { + if (Thread.currentThread().isVirtual() && Access.currentCarrier() instanceof EventLoopThread elt) { + return elt; + } else { + return null; + } + } + + /** + * Indicate that the current thread is going to be performing I/O-intensive operations + * with relatively little CPU or native usage. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + */ + public static void resumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.resumeOn(eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Indicate that the current thread is going to be performing native or CPU-intensive operations + * with relatively little I/O usage. + * After this method returns, the current thread will be carried by a pool thread. + */ + public static void resumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.resumeOn(ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @see LockSupport#park() + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(null, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOn(EventLoopThread eventLoopThread, Object blocker) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkAndResumeOn(blocker, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * @see LockSupport#park() + */ + public static void parkAndResumeOnPool() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(null, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @see LockSupport#park(Object) + */ + public static void parkAndResumeOnPool(Object blocker) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkAndResumeOn(blocker, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(null, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the given event loop thread. + * After this method returns, the current thread will be carried by the given event loop thread. + * + * @param eventLoopThread the event loop thread to resume on (must not be {@code null}) + * @param blocker the blocker object to register (see {@link LockSupport#park(Object)}) + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOn(EventLoopThread eventLoopThread, Object blocker, long nanos) { + Assert.checkNotNullParam("eventLoopThread", eventLoopThread); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts && ts.scheduler() == eventLoopThread.owner()) { + ts.parkNanosAndResumeOn(blocker, nanos, eventLoopThread.dispatcher()); + } else { + throw new IllegalArgumentException("Event loop thread " + eventLoopThread + " does not belong to the same scheduler as the current thread"); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(long) + */ + public static void parkNanosAndResumeOnPool(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(null, nanos, ts.scheduler().poolDispatcher()); + } + } + + /** + * Park and resume on the blocking pool. + * After this method returns, the current thread will be carried by a pool thread. + * + * @param blocker the blocker object to register + * @param nanos the number of nanoseconds to park for + * @see LockSupport#parkNanos(Object, long) + */ + public static void parkNanosAndResumeOnPool(Object blocker, long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof UserThreadScheduler ts) { + ts.parkNanosAndResumeOn(blocker, nanos, ts.scheduler().poolDispatcher()); + } + } + + /** + * Yield execution to any task is already waiting or will start waiting within the next {@code nanos} nanoseconds. + * If no tasks remain within the given criteria, the current thread will resume. + * + * @param nanos the number of nanoseconds to attempt to yield for + */ + public static void yieldNanos(long nanos) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + ts.delayBy(Math.max(0, nanos)); + } + Thread.yield(); + } + + /** + * {@return the priority of the current thread or virtual thread} + */ + public static int currentPriority() { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + return ts.priority(); + } else { + return thread.getPriority(); + } + } + + /** + * Change the priority of the current virtual thread, if possible. + * + * @param newPriority the new virtual thread priority + */ + public static void changePriority(int newPriority) { + newPriority = Math.min(Math.max(newPriority, Thread.MIN_PRIORITY), Thread.MAX_PRIORITY); + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && Access.schedulerOf(thread) instanceof ThreadScheduler ts) { + int old = ts.priority(); + if (newPriority != old) { + // apply new priority + ts.setPriority(newPriority); + Thread.yield(); + } + } else { + thread.setPriority(newPriority); + } + } +} From a46227d5c925efd5530b9820a4521c856c24534f Mon Sep 17 00:00:00 2001 From: "David M. Lloyd" Date: Mon, 24 Feb 2025 09:44:25 -0600 Subject: [PATCH 8/8] Minor doc fix --- src/main/java/org/jboss/threads/virtual/EventLoop.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/threads/virtual/EventLoop.java b/src/main/java/org/jboss/threads/virtual/EventLoop.java index 667d7012..df2e2e87 100644 --- a/src/main/java/org/jboss/threads/virtual/EventLoop.java +++ b/src/main/java/org/jboss/threads/virtual/EventLoop.java @@ -1,5 +1,7 @@ package org.jboss.threads.virtual; +import java.util.concurrent.locks.LockSupport; + import io.smallrye.common.annotation.Experimental; /** @@ -29,7 +31,7 @@ protected EventLoop() {} * the implementation of this method should * {@linkplain VirtualThreads#yieldNanos(long) yield for some amount of time} before returning to allow other threads to run. *

- * Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes, + * Note that {@linkplain Thread#sleep(long) sleeping} or {@linkplain LockSupport#park() parking} may cause latency spikes, * so it is not recommended. *

* This method should only be called from the event loop virtual thread.