8336254: Virtual thread implementation + test updates

Reviewed-by: sspitsyn, kevinw
This commit is contained in:
Alan Bateman 2024-07-25 04:59:01 +00:00
parent d3e51daf73
commit 6e228ce382
39 changed files with 2741 additions and 1363 deletions

View file

@ -65,7 +65,6 @@ import java.util.PropertyPermission;
import java.util.ResourceBundle;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
@ -2264,6 +2263,7 @@ public final class System {
super(fd);
}
@Override
public void write(int b) throws IOException {
boolean attempted = Blocker.begin();
try {
@ -2677,14 +2677,6 @@ public final class System {
return Thread.currentCarrierThread();
}
public <V> V executeOnCarrierThread(Callable<V> task) throws Exception {
if (Thread.currentThread() instanceof VirtualThread vthread) {
return vthread.executeOnCarrierThread(task);
} else {
return task.call();
}
}
public <T> T getCarrierThreadLocal(CarrierThreadLocal<T> local) {
return ((ThreadLocal<T>)local).getCarrierThreadLocal();
}

View file

@ -40,6 +40,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
@ -62,14 +63,13 @@ import sun.security.action.GetPropertyAction;
import static java.util.concurrent.TimeUnit.*;
/**
* A thread that is scheduled by the Java virtual machine rather than the operating
* system.
* A thread that is scheduled by the Java virtual machine rather than the operating system.
*/
final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();
private static final int TRACE_PINNING_MODE = tracePinningMode();
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
@ -217,7 +217,7 @@ final class VirtualThread extends BaseVirtualThread {
* on the current thread before the task runs or continues. It unmounts when the
* task completes or yields.
*/
@ChangesCurrentThread
@ChangesCurrentThread // allow mount/unmount to be inlined
private void runContinuation() {
// the carrier must be a platform thread
if (Thread.currentThread().isVirtual()) {
@ -257,42 +257,109 @@ final class VirtualThread extends BaseVirtualThread {
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* @param scheduler the scheduler
* @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
try {
scheduler.execute(runContinuation);
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
@ChangesCurrentThread
private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// The scheduler's execute method is invoked in the context of the
// carrier thread. For the default scheduler this ensures that the
// current thread is a ForkJoinWorkerThread so the task will be pushed
// to the local queue. For other schedulers, it avoids deadlock that
// would arise due to platform and virtual threads contending for a
// lock on the scheduler's submission queue.
if (currentThread() instanceof VirtualThread vthread) {
vthread.switchToCarrierThread();
try {
scheduler.execute(runContinuation);
} finally {
switchToVirtualThread(vthread);
}
} else {
scheduler.execute(runContinuation);
}
done = true;
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
if (retryOnOOME) {
U.park(false, 100_000_000); // 100ms
} else {
throw e;
}
}
}
}
/**
* Submits the runContinuation task to given scheduler with a lazy submit.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation(ForkJoinPool pool) {
assert Thread.currentThread() instanceof CarrierThread;
try {
pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation(pool, true);
}
}
/**
* Submits the runContinuation task to the given scheduler as an external submit.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/
private void externalSubmitRunContinuation(ForkJoinPool pool) {
assert Thread.currentThread() instanceof CarrierThread;
try {
pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation(pool, true);
}
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to an external submission queue.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
submitRunContinuation(scheduler, true);
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler, and
* calling it a virtual thread that uses the default scheduler, the task will be
* pushed to an external submission queue. This method may throw OutOfMemoryError.
* @throws RejectedExecutionException
* @throws OutOfMemoryError
*/
private void externalSubmitRunContinuationOrThrow() {
if (scheduler == DEFAULT_SCHEDULER && currentCarrierThread() instanceof CarrierThread ct) {
try {
ct.getPool().externalSubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
} else {
submitRunContinuation(scheduler, false);
}
}
@ -385,6 +452,8 @@ final class VirtualThread extends BaseVirtualThread {
@ChangesCurrentThread
@ReservedStackAccess
private void unmount() {
assert !Thread.holdsLock(interruptLock);
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
carrier.setCurrentThread(carrier);
@ -417,7 +486,7 @@ final class VirtualThread extends BaseVirtualThread {
*/
@ChangesCurrentThread
@JvmtiMountTransition
private void switchToVirtualThread(VirtualThread vthread) {
private static void switchToVirtualThread(VirtualThread vthread) {
Thread carrier = vthread.carrierThread;
assert carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(vthread);
@ -474,13 +543,12 @@ final class VirtualThread extends BaseVirtualThread {
// may have been unparked while parking
if (parkPermit && compareAndSetState(newState, UNPARKED)) {
// lazy submit to continue on the current thread as carrier if possible
if (currentThread() instanceof CarrierThread ct) {
// lazy submit to continue on the current carrier if possible
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
lazySubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
}
return;
}
@ -561,8 +629,8 @@ final class VirtualThread extends BaseVirtualThread {
// scoped values may be inherited
inheritScopedValueBindings(container);
// submit task to run thread
submitRunContinuation();
// submit task to run thread, using externalSubmit if possible
externalSubmitRunContinuationOrThrow();
started = true;
} finally {
if (!started) {
@ -707,7 +775,7 @@ final class VirtualThread extends BaseVirtualThread {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
return UNPARKER.schedule(this::unpark, nanos, NANOSECONDS);
return schedule(this::unpark, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
@ -718,6 +786,7 @@ final class VirtualThread extends BaseVirtualThread {
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
assert Thread.currentThread() == this;
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
@ -730,33 +799,26 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Re-enables this virtual thread for scheduling. If the virtual thread was
* {@link #park() parked} then it will be unblocked, otherwise its next call
* to {@code park} or {@linkplain #parkNanos(long) parkNanos} is guaranteed
* not to block.
* Re-enables this virtual thread for scheduling. If this virtual thread is parked
* then its task is scheduled to continue, otherwise its next call to {@code park} or
* {@linkplain #parkNanos(long) parkNanos} is guaranteed not to block.
* @throws RejectedExecutionException if the scheduler cannot accept a task
*/
@Override
@ChangesCurrentThread
void unpark() {
Thread currentThread = Thread.currentThread();
if (!getAndSetParkPermit(true) && currentThread != this) {
if (!getAndSetParkPermit(true) && currentThread() != this) {
int s = state();
boolean parked = (s == PARKED) || (s == TIMED_PARKED);
if (parked && compareAndSetState(s, UNPARKED)) {
if (currentThread instanceof VirtualThread vthread) {
vthread.switchToCarrierThread();
try {
submitRunContinuation();
} finally {
switchToVirtualThread(vthread);
}
} else {
submitRunContinuation();
}
} else if ((s == PINNED) || (s == TIMED_PINNED)) {
// unparked while parked
if ((s == PARKED || s == TIMED_PARKED) && compareAndSetState(s, UNPARKED)) {
submitRunContinuation();
return;
}
// unparked while parked when pinned
if (s == PINNED || s == TIMED_PINNED) {
// unpark carrier thread when pinned
notifyJvmtiDisableSuspend(true);
disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
Thread carrier = carrierThread;
@ -765,8 +827,9 @@ final class VirtualThread extends BaseVirtualThread {
}
}
} finally {
notifyJvmtiDisableSuspend(false);
enableSuspendAndPreempt();
}
return;
}
}
}
@ -859,11 +922,11 @@ final class VirtualThread extends BaseVirtualThread {
@Override
void blockedOn(Interruptible b) {
notifyJvmtiDisableSuspend(true);
disableSuspendAndPreempt();
try {
super.blockedOn(b);
} finally {
notifyJvmtiDisableSuspend(false);
enableSuspendAndPreempt();
}
}
@ -874,9 +937,9 @@ final class VirtualThread extends BaseVirtualThread {
checkAccess();
// if current thread is a virtual thread then prevent it from being
// suspended when entering or holding interruptLock
// suspended or unmounted when entering or holding interruptLock
Interruptible blocker;
notifyJvmtiDisableSuspend(true);
disableSuspendAndPreempt();
try {
synchronized (interruptLock) {
interrupted = true;
@ -890,18 +953,22 @@ final class VirtualThread extends BaseVirtualThread {
if (carrier != null) carrier.setInterrupt();
}
} finally {
notifyJvmtiDisableSuspend(false);
enableSuspendAndPreempt();
}
// notify blocker after releasing interruptLock
if (blocker != null) {
blocker.postInterrupt();
}
// make available parking permit, unpark thread if parked
unpark();
} else {
interrupted = true;
carrierThread.setInterrupt();
setParkPermit(true);
}
unpark();
}
@Override
@ -914,14 +981,14 @@ final class VirtualThread extends BaseVirtualThread {
assert Thread.currentThread() == this;
boolean oldValue = interrupted;
if (oldValue) {
notifyJvmtiDisableSuspend(true);
disableSuspendAndPreempt();
try {
synchronized (interruptLock) {
interrupted = false;
carrierThread.clearInterrupt();
}
} finally {
notifyJvmtiDisableSuspend(false);
enableSuspendAndPreempt();
}
}
return oldValue;
@ -946,16 +1013,18 @@ final class VirtualThread extends BaseVirtualThread {
return Thread.State.RUNNABLE;
case RUNNING:
// if mounted then return state of carrier thread
notifyJvmtiDisableSuspend(true);
try {
synchronized (carrierThreadAccessLock()) {
Thread carrierThread = this.carrierThread;
if (carrierThread != null) {
return carrierThread.threadState();
if (Thread.currentThread() != this) {
disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
Thread carrierThread = this.carrierThread;
if (carrierThread != null) {
return carrierThread.threadState();
}
}
} finally {
enableSuspendAndPreempt();
}
} finally {
notifyJvmtiDisableSuspend(false);
}
// runnable, mounted
return Thread.State.RUNNABLE;
@ -1068,32 +1137,49 @@ final class VirtualThread extends BaseVirtualThread {
sb.append(name);
}
sb.append("]/");
Thread carrier = carrierThread;
if (carrier != null) {
// include the carrier thread state and name when mounted
notifyJvmtiDisableSuspend(true);
// add the carrier state and thread name when mounted
boolean mounted;
if (Thread.currentThread() == this) {
mounted = appendCarrierInfo(sb);
} else {
disableSuspendAndPreempt();
try {
synchronized (carrierThreadAccessLock()) {
carrier = carrierThread;
if (carrier != null) {
String stateAsString = carrier.threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
sb.append('@');
sb.append(carrier.getName());
}
mounted = appendCarrierInfo(sb);
}
} finally {
notifyJvmtiDisableSuspend(false);
enableSuspendAndPreempt();
}
}
// include virtual thread state when not mounted
if (carrier == null) {
// add virtual thread state when not mounted
if (!mounted) {
String stateAsString = threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
}
return sb.toString();
}
/**
* Appends the carrier state and thread name to the string buffer if mounted.
* @return true if mounted, false if not mounted
*/
private boolean appendCarrierInfo(StringBuilder sb) {
assert Thread.currentThread() == this || Thread.holdsLock(carrierThreadAccessLock());
Thread carrier = carrierThread;
if (carrier != null) {
String stateAsString = carrier.threadState().toString();
sb.append(stateAsString.toLowerCase(Locale.ROOT));
sb.append('@');
sb.append(carrier.getName());
return true;
} else {
return false;
}
}
@Override
public int hashCode() {
return (int) threadId();
@ -1127,6 +1213,22 @@ final class VirtualThread extends BaseVirtualThread {
return interruptLock;
}
/**
* Disallow the current thread be suspended or preempted.
*/
private void disableSuspendAndPreempt() {
notifyJvmtiDisableSuspend(true);
Continuation.pin();
}
/**
* Allow the current thread be suspended or preempted.
*/
private void enableSuspendAndPreempt() {
Continuation.unpin();
notifyJvmtiDisableSuspend(false);
}
// -- wrappers for get/set of state, parking permit, and carrier thread --
private int state() {
@ -1188,10 +1290,16 @@ final class VirtualThread extends BaseVirtualThread {
private static native void registerNatives();
static {
registerNatives();
// ensure VTHREAD_GROUP is created, may be accessed by JVMTI
var group = Thread.virtualThreadGroup();
// ensure VirtualThreadPinnedEvent is loaded/initialized
U.ensureClassInitialized(VirtualThreadPinnedEvent.class);
}
/**
* Creates the default scheduler.
* Creates the default ForkJoinPool scheduler.
*/
@SuppressWarnings("removal")
private static ForkJoinPool createDefaultScheduler() {
@ -1229,22 +1337,42 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Creates the ScheduledThreadPoolExecutor used for timed unpark.
* Schedule a runnable task to run after a delay.
*/
private static ScheduledExecutorService createDelayedTaskScheduler() {
String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
int poolSize;
private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
long tid = Thread.currentThread().threadId();
int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit);
}
/**
* Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
*/
private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
String propName = "jdk.virtualThreadScheduler.timerQueues";
String propValue = GetPropertyAction.privilegedGetProperty(propName);
int queueCount;
if (propValue != null) {
poolSize = Integer.parseInt(propValue);
queueCount = Integer.parseInt(propValue);
if (queueCount != Integer.highestOneBit(queueCount)) {
throw new RuntimeException("Value of " + propName + " must be power of 2");
}
} else {
poolSize = 1;
int ncpus = Runtime.getRuntime().availableProcessors();
queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
}
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(poolSize, task -> {
return InnocuousThread.newThread("VirtualThread-unparker", task);
});
stpe.setRemoveOnCancelPolicy(true);
return stpe;
var schedulers = new ScheduledExecutorService[queueCount];
for (int i = 0; i < queueCount; i++) {
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1, task -> {
Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
t.setDaemon(true);
return t;
});
stpe.setRemoveOnCancelPolicy(true);
schedulers[i] = stpe;
}
return schedulers;
}
/**

View file

@ -35,7 +35,9 @@
package java.util.concurrent.locks;
import jdk.internal.misc.VirtualThreads;
import java.util.concurrent.TimeUnit;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
/**
@ -176,7 +178,7 @@ public class LockSupport {
public static void unpark(Thread thread) {
if (thread != null) {
if (thread.isVirtual()) {
VirtualThreads.unpark(thread);
JLA.unparkVirtualThread(thread);
} else {
U.unpark(thread);
}
@ -216,7 +218,7 @@ public class LockSupport {
setBlocker(t, blocker);
try {
if (t.isVirtual()) {
VirtualThreads.park();
JLA.parkVirtualThread();
} else {
U.park(false, 0L);
}
@ -264,7 +266,7 @@ public class LockSupport {
setBlocker(t, blocker);
try {
if (t.isVirtual()) {
VirtualThreads.park(nanos);
JLA.parkVirtualThread(nanos);
} else {
U.park(false, nanos);
}
@ -311,11 +313,7 @@ public class LockSupport {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
try {
if (t.isVirtual()) {
VirtualThreads.parkUntil(deadline);
} else {
U.park(true, deadline);
}
parkUntil(deadline);
} finally {
setBlocker(t, null);
}
@ -366,7 +364,7 @@ public class LockSupport {
*/
public static void park() {
if (Thread.currentThread().isVirtual()) {
VirtualThreads.park();
JLA.parkVirtualThread();
} else {
U.park(false, 0L);
}
@ -405,7 +403,7 @@ public class LockSupport {
public static void parkNanos(long nanos) {
if (nanos > 0) {
if (Thread.currentThread().isVirtual()) {
VirtualThreads.park(nanos);
JLA.parkVirtualThread(nanos);
} else {
U.park(false, nanos);
}
@ -444,7 +442,8 @@ public class LockSupport {
*/
public static void parkUntil(long deadline) {
if (Thread.currentThread().isVirtual()) {
VirtualThreads.parkUntil(deadline);
long millis = deadline - System.currentTimeMillis();
JLA.parkVirtualThread(TimeUnit.MILLISECONDS.toNanos(millis));
} else {
U.park(true, deadline);
}
@ -462,4 +461,5 @@ public class LockSupport {
private static final long PARKBLOCKER
= U.objectFieldOffset(Thread.class, "parkBlocker");
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
}