8338383: Implement JEP 491: Synchronize Virtual Threads without Pinning

Co-authored-by: Patricio Chilano Mateo <pchilanomate@openjdk.org>
Co-authored-by: Alan Bateman <alanb@openjdk.org>
Co-authored-by: Andrew Haley <aph@openjdk.org>
Co-authored-by: Fei Yang <fyang@openjdk.org>
Co-authored-by: Coleen Phillimore <coleenp@openjdk.org>
Co-authored-by: Richard Reingruber <rrich@openjdk.org>
Co-authored-by: Martin Doerr <mdoerr@openjdk.org>
Reviewed-by: aboldtch, dholmes, coleenp, fbredberg, dlong, sspitsyn
This commit is contained in:
Patricio Chilano Mateo 2024-11-12 15:23:48 +00:00
parent 8a2a75e56d
commit 78b80150e0
246 changed files with 8295 additions and 2755 deletions

View file

@ -41,7 +41,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
import jdk.internal.misc.CarrierThread;
@ -70,12 +69,12 @@ final class VirtualThread extends BaseVirtualThread {
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");
private static final long ON_WAITING_LIST = U.objectFieldOffset(VirtualThread.class, "onWaitingList");
// scheduler and continuation
private final Executor scheduler;
@ -106,6 +105,21 @@ final class VirtualThread extends BaseVirtualThread {
* TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
*
* RUNNING -> BLOCKING // blocking on monitor enter
* BLOCKING -> BLOCKED // blocked on monitor enter
* BLOCKED -> UNBLOCKED // unblocked, may be scheduled to continue
* UNBLOCKED -> RUNNING // continue execution after blocked on monitor enter
*
* RUNNING -> WAITING // transitional state during wait on monitor
* WAITING -> WAITED // waiting on monitor
* WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
* WAITED -> UNBLOCKED // timed-out/interrupted
*
* RUNNING -> TIMED_WAITING // transition state during timed-waiting on monitor
* TIMED_WAITING -> TIMED_WAITED // timed-waiting on monitor
* TIMED_WAITED -> BLOCKED // notified, waiting to be unblocked by monitor owner
* TIMED_WAITED -> UNBLOCKED // timed-out/interrupted
*
* RUNNING -> YIELDING // Thread.yield
* YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
* YIELDING -> RUNNING // cont.yield failed
@ -128,18 +142,44 @@ final class VirtualThread extends BaseVirtualThread {
private static final int YIELDING = 10;
private static final int YIELDED = 11; // unmounted but runnable
// monitor enter
private static final int BLOCKING = 12;
private static final int BLOCKED = 13; // unmounted
private static final int UNBLOCKED = 14; // unmounted but runnable
// monitor wait/timed-wait
private static final int WAITING = 15;
private static final int WAIT = 16; // waiting in Object.wait
private static final int TIMED_WAITING = 17;
private static final int TIMED_WAIT = 18; // waiting in timed-Object.wait
private static final int TERMINATED = 99; // final state
// can be suspended from scheduling when unmounted
private static final int SUSPENDED = 1 << 8;
// parking permit
// parking permit made available by LockSupport.unpark
private volatile boolean parkPermit;
// timeout for timed-park, in nanoseconds, only accessed on current/carrier thread
private long parkTimeout;
// blocking permit made available by unblocker thread when another thread exits monitor
private volatile boolean blockPermit;
// timer task for timed-park, only accessed on current/carrier thread
// true when on the list of virtual threads waiting to be unblocked
private volatile boolean onWaitingList;
// next virtual thread on the list of virtual threads waiting to be unblocked
private volatile VirtualThread next;
// notified by Object.notify/notifyAll while waiting in Object.wait
private volatile boolean notified;
// timed-wait support
private byte timedWaitSeqNo;
// timeout for timed-park and timed-wait, only accessed on current/carrier thread
private long timeout;
// timer task for timed-park and timed-wait, only accessed on current/carrier thread
private Future<?> timeoutTask;
// carrier thread when mounted, accessed by VM
@ -202,18 +242,6 @@ final class VirtualThread extends BaseVirtualThread {
}
@Override
protected void onPinned(Continuation.Pinned reason) {
if (TRACE_PINNING_MODE > 0) {
boolean printAll = (TRACE_PINNING_MODE == 1);
VirtualThread vthread = (VirtualThread) Thread.currentThread();
int oldState = vthread.state();
try {
// avoid printing when in transition states
vthread.setState(RUNNING);
PinnedThreadPrinter.printStackTrace(System.out, reason, printAll);
} finally {
vthread.setState(oldState);
}
}
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@ -245,16 +273,20 @@ final class VirtualThread extends BaseVirtualThread {
// set state to RUNNING
int initialState = state();
if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED) {
if (initialState == STARTED || initialState == UNPARKED
|| initialState == UNBLOCKED || initialState == YIELDED) {
// newly started or continue after parking/blocking/Thread.yield
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume permit when continuing after parking. If continuing after a
// timed-park then the timeout task is cancelled.
// consume permit when continuing after parking or blocking. If continue
// after a timed-park or timed-wait then the timeout task is cancelled.
if (initialState == UNPARKED) {
cancelTimeoutTask();
setParkPermit(false);
} else if (initialState == UNBLOCKED) {
cancelTimeoutTask();
blockPermit = false;
}
} else {
// not runnable
@ -275,8 +307,8 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Cancel timeout task when continuing after a timed-park. The
* timeout task may be executing, or may have already completed.
* Cancel timeout task when continuing after timed-park or timed-wait.
* The timeout task may be executing, or may have already completed.
*/
private void cancelTimeoutTask() {
if (timeoutTask != null) {
@ -511,7 +543,7 @@ final class VirtualThread extends BaseVirtualThread {
/**
* Invoked in the context of the carrier thread after the Continuation yields when
* parking or Thread.yield.
* parking, blocking on monitor enter, Object.wait, or Thread.yield.
*/
private void afterYield() {
assert carrierThread == null;
@ -530,8 +562,8 @@ final class VirtualThread extends BaseVirtualThread {
setState(newState = PARKED);
} else {
// schedule unpark
assert parkTimeout > 0;
timeoutTask = schedule(this::unpark, parkTimeout, NANOSECONDS);
assert timeout > 0;
timeoutTask = schedule(this::unpark, timeout, NANOSECONDS);
setState(newState = TIMED_PARKED);
}
@ -556,6 +588,56 @@ final class VirtualThread extends BaseVirtualThread {
return;
}
// blocking on monitorenter
if (s == BLOCKING) {
setState(BLOCKED);
// may have been unblocked while blocking
if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
// lazy submit if local queue is empty
lazySubmitRunContinuation();
}
return;
}
// Object.wait
if (s == WAITING || s == TIMED_WAITING) {
int newState;
if (s == WAITING) {
setState(newState = WAIT);
} else {
// For timed-wait, a timeout task is scheduled to execute. The timeout
// task will change the thread state to UNBLOCKED and submit the thread
// to the scheduler. A sequence number is used to ensure that the timeout
// task only unblocks the thread for this timed-wait. We synchronize with
// the timeout task to coordinate access to the sequence number and to
// ensure the timeout task doesn't execute until the thread has got to
// the TIMED_WAIT state.
assert timeout > 0;
synchronized (timedWaitLock()) {
byte seqNo = ++timedWaitSeqNo;
timeoutTask = schedule(() -> waitTimeoutExpired(seqNo), timeout, MILLISECONDS);
setState(newState = TIMED_WAIT);
}
}
// may have been notified while in transition to wait state
if (notified && compareAndSetState(newState, BLOCKED)) {
// may have even been unblocked already
if (blockPermit && compareAndSetState(BLOCKED, UNBLOCKED)) {
submitRunContinuation();
}
return;
}
// may have been interrupted while in transition to wait state
if (interrupted && compareAndSetState(newState, UNBLOCKED)) {
submitRunContinuation();
return;
}
return;
}
assert false;
}
@ -695,7 +777,7 @@ final class VirtualThread extends BaseVirtualThread {
// park the thread, afterYield will schedule the thread to unpark
boolean yielded = false;
setParkTimeout(nanos);
timeout = nanos;
setState(TIMED_PARKING);
try {
yielded = yieldContinuation();
@ -727,14 +809,6 @@ final class VirtualThread extends BaseVirtualThread {
private void parkOnCarrierThread(boolean timed, long nanos) {
assert state() == RUNNING;
VirtualThreadPinnedEvent event;
try {
event = new VirtualThreadPinnedEvent();
event.begin();
} catch (OutOfMemoryError e) {
event = null;
}
setState(timed ? TIMED_PINNED : PINNED);
try {
if (!parkPermit) {
@ -751,15 +825,18 @@ final class VirtualThread extends BaseVirtualThread {
// consume parking permit
setParkPermit(false);
if (event != null) {
try {
event.commit();
} catch (OutOfMemoryError e) {
// ignore
}
}
// JFR jdk.VirtualThreadPinned event
postPinnedEvent("LockSupport.park");
}
/**
* Call into VM when pinned to record a JFR jdk.VirtualThreadPinned event.
* Recording the event in the VM avoids having JFR event recorded in Java
* with the same name, but different ID, to events recorded by the VM.
*/
@Hidden
private static native void postPinnedEvent(String op);
/**
* Re-enables this virtual thread for scheduling. If this virtual thread is parked
* then its task is scheduled to continue, otherwise its next call to {@code park} or
@ -796,6 +873,49 @@ final class VirtualThread extends BaseVirtualThread {
}
}
/**
* Invoked by unblocker thread to unblock this virtual thread.
*/
private void unblock() {
assert !Thread.currentThread().isVirtual();
blockPermit = true;
if (state() == BLOCKED && compareAndSetState(BLOCKED, UNBLOCKED)) {
submitRunContinuation();
}
}
/**
* Invoked by timer thread when wait timeout for virtual thread has expired.
* If the virtual thread is in timed-wait then this method will unblock the thread
* and submit its task so that it continues and attempts to reenter the monitor.
* This method does nothing if the thread has been woken by notify or interrupt.
*/
private void waitTimeoutExpired(byte seqNo) {
assert !Thread.currentThread().isVirtual();
for (;;) {
boolean unblocked = false;
synchronized (timedWaitLock()) {
if (seqNo != timedWaitSeqNo) {
// this timeout task is for a past timed-wait
return;
}
int s = state();
if (s == TIMED_WAIT) {
unblocked = compareAndSetState(TIMED_WAIT, UNBLOCKED);
} else if (s != (TIMED_WAIT | SUSPENDED)) {
// notified or interrupted, no longer waiting
return;
}
}
if (unblocked) {
submitRunContinuation();
return;
}
// need to retry when thread is suspended in time-wait
Thread.yield();
}
}
/**
* Attempts to yield the current virtual thread (Thread.yield).
*/
@ -926,6 +1046,12 @@ final class VirtualThread extends BaseVirtualThread {
// make available parking permit, unpark thread if parked
unpark();
// if thread is waiting in Object.wait then schedule to try to reenter
int s = state();
if ((s == WAIT || s == TIMED_WAIT) && compareAndSetState(s, UNBLOCKED)) {
submitRunContinuation();
}
} else {
interrupted = true;
carrierThread.setInterrupt();
@ -970,6 +1096,7 @@ final class VirtualThread extends BaseVirtualThread {
return Thread.State.RUNNABLE;
}
case UNPARKED:
case UNBLOCKED:
case YIELDED:
// runnable, not mounted
return Thread.State.RUNNABLE;
@ -992,15 +1119,22 @@ final class VirtualThread extends BaseVirtualThread {
return Thread.State.RUNNABLE;
case PARKING:
case TIMED_PARKING:
case WAITING:
case TIMED_WAITING:
case YIELDING:
// runnable, in transition
return Thread.State.RUNNABLE;
case PARKED:
case PINNED:
case WAIT:
return Thread.State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
case TIMED_WAIT:
return Thread.State.TIMED_WAITING;
case BLOCKING:
case BLOCKED:
return Thread.State.BLOCKED;
case TERMINATED:
return Thread.State.TERMINATED;
default:
@ -1046,13 +1180,13 @@ final class VirtualThread extends BaseVirtualThread {
case RUNNING, PINNED, TIMED_PINNED -> {
return null; // mounted
}
case PARKED, TIMED_PARKED -> {
case PARKED, TIMED_PARKED, BLOCKED, WAIT, TIMED_WAIT -> {
// unmounted, not runnable
}
case UNPARKED, YIELDED -> {
case UNPARKED, UNBLOCKED, YIELDED -> {
// unmounted, runnable
}
case PARKING, TIMED_PARKING, YIELDING -> {
case PARKING, TIMED_PARKING, BLOCKING, YIELDING, WAITING, TIMED_WAITING -> {
return null; // in transition
}
default -> throw new InternalError("" + initialState);
@ -1073,7 +1207,7 @@ final class VirtualThread extends BaseVirtualThread {
setState(initialState);
}
boolean resubmit = switch (initialState) {
case UNPARKED, YIELDED -> {
case UNPARKED, UNBLOCKED, YIELDED -> {
// resubmit as task may have run while suspended
yield true;
}
@ -1081,6 +1215,15 @@ final class VirtualThread extends BaseVirtualThread {
// resubmit if unparked while suspended
yield parkPermit && compareAndSetState(initialState, UNPARKED);
}
case BLOCKED -> {
// resubmit if unblocked while suspended
yield blockPermit && compareAndSetState(BLOCKED, UNBLOCKED);
}
case WAIT, TIMED_WAIT -> {
// resubmit if notified or interrupted while waiting (Object.wait)
// waitTimeoutExpired will retry if the timed expired when suspended
yield (notified || interrupted) && compareAndSetState(initialState, UNBLOCKED);
}
default -> throw new InternalError();
};
if (resubmit) {
@ -1175,6 +1318,14 @@ final class VirtualThread extends BaseVirtualThread {
return interruptLock;
}
/**
* Returns a lock object for coordinating timed-wait setup and timeout handling.
*/
private Object timedWaitLock() {
// use this object for now to avoid the overhead of introducing another lock
return runContinuation;
}
/**
* Disallow the current thread be suspended or preempted.
*/
@ -1205,6 +1356,10 @@ final class VirtualThread extends BaseVirtualThread {
return U.compareAndSetInt(this, STATE, expectedValue, newValue);
}
private boolean compareAndSetOnWaitingList(boolean expectedValue, boolean newValue) {
return U.compareAndSetBoolean(this, ON_WAITING_LIST, expectedValue, newValue);
}
private void setParkPermit(boolean newValue) {
if (parkPermit != newValue) {
parkPermit = newValue;
@ -1219,10 +1374,6 @@ final class VirtualThread extends BaseVirtualThread {
}
}
private void setParkTimeout(long timeout) {
parkTimeout = timeout;
}
private void setCarrierThread(Thread carrier) {
// U.putReferenceRelease(this, CARRIER_THREAD, carrier);
this.carrierThread = carrier;
@ -1255,9 +1406,6 @@ final class VirtualThread extends BaseVirtualThread {
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
// ensure VirtualThreadPinnedEvent is loaded/initialized
U.ensureClassInitialized(VirtualThreadPinnedEvent.class);
}
/**
@ -1338,18 +1486,37 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Reads the value of the jdk.tracePinnedThreads property to determine if stack
* traces should be printed when a carrier thread is pinned when a virtual thread
* attempts to park.
* Schedule virtual threads that are ready to be scheduled after they blocked on
* monitor enter.
*/
private static int tracePinningMode() {
String propValue = GetPropertyAction.privilegedGetProperty("jdk.tracePinnedThreads");
if (propValue != null) {
if (propValue.length() == 0 || "full".equalsIgnoreCase(propValue))
return 1;
if ("short".equalsIgnoreCase(propValue))
return 2;
private static void unblockVirtualThreads() {
while (true) {
VirtualThread vthread = takeVirtualThreadListToUnblock();
while (vthread != null) {
assert vthread.onWaitingList;
VirtualThread nextThread = vthread.next;
// remove from list and unblock
vthread.next = null;
boolean changed = vthread.compareAndSetOnWaitingList(true, false);
assert changed;
vthread.unblock();
vthread = nextThread;
}
}
return 0;
}
/**
* Retrieves the list of virtual threads that are waiting to be unblocked, waiting
* if necessary until a list of one or more threads becomes available.
*/
private static native VirtualThread takeVirtualThreadListToUnblock();
static {
var unblocker = InnocuousThread.newThread("VirtualThread-unblocker",
VirtualThread::unblockVirtualThreads);
unblocker.setDaemon(true);
unblocker.start();
}
}