8343132: Remove temporary transitions from Virtual thread implementation

Reviewed-by: dholmes, sspitsyn, pchilanomate
This commit is contained in:
Alan Bateman 2024-10-31 08:53:19 +00:00
parent 2f1ba5ef09
commit dee0982c60
16 changed files with 184 additions and 263 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 1997, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1997, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -232,8 +232,8 @@ public class ThreadLocal<T> {
if (this instanceof TerminatingThreadLocal<?> ttl) {
TerminatingThreadLocal.register(ttl);
}
if (TRACE_VTHREAD_LOCALS) {
dumpStackIfVirtualThread();
if (TRACE_VTHREAD_LOCALS && t == Thread.currentThread() && t.isVirtual()) {
printStackTrace();
}
return value;
}
@ -249,8 +249,8 @@ public class ThreadLocal<T> {
*/
public void set(T value) {
set(Thread.currentThread(), value);
if (TRACE_VTHREAD_LOCALS) {
dumpStackIfVirtualThread();
if (TRACE_VTHREAD_LOCALS && Thread.currentThread().isVirtual()) {
printStackTrace();
}
}
@ -799,7 +799,6 @@ public class ThreadLocal<T> {
}
}
/**
* Reads the value of the jdk.traceVirtualThreadLocals property to determine if
* a stack trace should be printed when a virtual thread sets a thread local.
@ -811,30 +810,28 @@ public class ThreadLocal<T> {
}
/**
* Print a stack trace if the current thread is a virtual thread.
* Print the stack trace of the current thread, skipping the printStackTrace frame.
* A thread local is used to detect reentrancy as the printing may itself use
* thread locals.
*/
static void dumpStackIfVirtualThread() {
if (Thread.currentThread() instanceof VirtualThread vthread) {
private void printStackTrace() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map.getEntry(DUMPING_STACK) == null) {
map.set(DUMPING_STACK, true);
try {
var stack = StackWalkerHolder.STACK_WALKER.walk(s ->
var stack = StackWalker.getInstance().walk(s ->
s.skip(1) // skip caller
.collect(Collectors.toList()));
// switch to carrier thread to avoid recursive use of thread-locals
vthread.executeOnCarrierThread(() -> {
System.out.println(vthread);
for (StackWalker.StackFrame frame : stack) {
System.out.format(" %s%n", frame.toStackTraceElement());
}
return null;
});
} catch (Exception e) {
throw new InternalError(e);
System.out.println(t);
for (StackWalker.StackFrame frame : stack) {
System.out.format(" %s%n", frame.toStackTraceElement());
}
} finally {
map.remove(DUMPING_STACK);
}
}
}
private static class StackWalkerHolder {
static final StackWalker STACK_WALKER = StackWalker.getInstance();
}
private static final ThreadLocal<Boolean> DUMPING_STACK = new ThreadLocal<>();
}

View file

@ -28,7 +28,6 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -137,13 +136,18 @@ final class VirtualThread extends BaseVirtualThread {
// parking permit
private volatile boolean parkPermit;
// timeout for timed-park, in nanoseconds, only accessed on current/carrier thread
private long parkTimeout;
// timer task for timed-park, only accessed on current/carrier thread
private Future<?> timeoutTask;
// carrier thread when mounted, accessed by VM
private volatile Thread carrierThread;
// termination object when joining, created lazily if needed
private volatile CountDownLatch termination;
/**
* Returns the default scheduler.
*/
@ -246,8 +250,10 @@ final class VirtualThread extends BaseVirtualThread {
if (!compareAndSetState(initialState, RUNNING)) {
return;
}
// consume parking permit when continuing after parking
// consume permit when continuing after parking. If continuing after a
// timed-park then the timeout task is cancelled.
if (initialState == UNPARKED) {
cancelTimeoutTask();
setParkPermit(false);
}
} else {
@ -268,6 +274,17 @@ final class VirtualThread extends BaseVirtualThread {
}
}
/**
* Cancel timeout task when continuing after a timed-park. The
* timeout task may be executing, or may have already completed.
*/
private void cancelTimeoutTask() {
if (timeoutTask != null) {
timeoutTask.cancel(false);
timeoutTask = null;
}
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
@ -276,23 +293,21 @@ final class VirtualThread extends BaseVirtualThread {
* @param retryOnOOME true to retry indefinitely if OutOfMemoryError is thrown
* @throws RejectedExecutionException
*/
@ChangesCurrentThread
private void submitRunContinuation(Executor scheduler, boolean retryOnOOME) {
boolean done = false;
while (!done) {
try {
// The scheduler's execute method is invoked in the context of the
// carrier thread. For the default scheduler this ensures that the
// current thread is a ForkJoinWorkerThread so the task will be pushed
// to the local queue. For other schedulers, it avoids deadlock that
// would arise due to platform and virtual threads contending for a
// lock on the scheduler's submission queue.
if (currentThread() instanceof VirtualThread vthread) {
vthread.switchToCarrierThread();
// Pin the continuation to prevent the virtual thread from unmounting
// when submitting a task. For the default scheduler this ensures that
// the carrier doesn't change when pushing a task. For other schedulers
// it avoids deadlock that could arise due to carriers and virtual
// threads contending for a lock.
if (currentThread().isVirtual()) {
Continuation.pin();
try {
scheduler.execute(runContinuation);
} finally {
switchToVirtualThread(vthread);
Continuation.unpin();
}
} else {
scheduler.execute(runContinuation);
@ -311,24 +326,6 @@ final class VirtualThread extends BaseVirtualThread {
}
}
/**
* Submits the runContinuation task to given scheduler with a lazy submit.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation(ForkJoinPool pool) {
assert Thread.currentThread() instanceof CarrierThread;
try {
pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation(pool, true);
}
}
/**
* Submits the runContinuation task to the given scheduler as an external submit.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
@ -358,6 +355,30 @@ final class VirtualThread extends BaseVirtualThread {
submitRunContinuation(scheduler, true);
}
/**
* Lazy submit the runContinuation task if invoked on a carrier thread and its local
* queue is empty. If not empty, or invoked by another thread, then this method works
* like submitRunContinuation and just submits the task to the scheduler.
* If OutOfMemoryError is thrown then the submit will be retried until it succeeds.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void lazySubmitRunContinuation() {
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
ForkJoinPool pool = ct.getPool();
try {
pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
} catch (OutOfMemoryError e) {
submitRunContinuation();
}
} else {
submitRunContinuation();
}
}
/**
* Submits the runContinuation task to the scheduler. For the default scheduler, and
* calling it a virtual thread that uses the default scheduler, the task will be
@ -474,45 +495,6 @@ final class VirtualThread extends BaseVirtualThread {
notifyJvmtiUnmount(/*hide*/false);
}
/**
* Sets the current thread to the current carrier thread.
*/
@ChangesCurrentThread
@JvmtiMountTransition
private void switchToCarrierThread() {
notifyJvmtiHideFrames(true);
Thread carrier = this.carrierThread;
assert Thread.currentThread() == this
&& carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(carrier);
}
/**
* Sets the current thread to the given virtual thread.
*/
@ChangesCurrentThread
@JvmtiMountTransition
private static void switchToVirtualThread(VirtualThread vthread) {
Thread carrier = vthread.carrierThread;
assert carrier == Thread.currentCarrierThread();
carrier.setCurrentThread(vthread);
notifyJvmtiHideFrames(false);
}
/**
* Executes the given value returning task on the current carrier thread.
*/
@ChangesCurrentThread
<V> V executeOnCarrierThread(Callable<V> task) throws Exception {
assert Thread.currentThread() == this;
switchToCarrierThread();
try {
return task.call();
} finally {
switchToVirtualThread(this);
}
}
/**
* Invokes Continuation.yield, notifying JVMTI (if enabled) to hide frames until
* the continuation continues.
@ -528,9 +510,8 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Invoked after the continuation yields. If parking then it sets the state
* and also re-submits the task to continue if unparked while parking.
* If yielding due to Thread.yield then it just submits the task to continue.
* Invoked in the context of the carrier thread after the Continuation yields when
* parking or Thread.yield.
*/
private void afterYield() {
assert carrierThread == null;
@ -544,17 +525,20 @@ final class VirtualThread extends BaseVirtualThread {
// LockSupport.park/parkNanos
if (s == PARKING || s == TIMED_PARKING) {
int newState = (s == PARKING) ? PARKED : TIMED_PARKED;
setState(newState);
int newState;
if (s == PARKING) {
setState(newState = PARKED);
} else {
// schedule unpark
assert parkTimeout > 0;
timeoutTask = schedule(this::unpark, parkTimeout, NANOSECONDS);
setState(newState = TIMED_PARKED);
}
// may have been unparked while parking
if (parkPermit && compareAndSetState(newState, UNPARKED)) {
// lazy submit to continue on the current carrier if possible
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
lazySubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
// lazy submit if local queue is empty
lazySubmitRunContinuation();
}
return;
}
@ -672,7 +656,9 @@ final class VirtualThread extends BaseVirtualThread {
boolean yielded = false;
setState(PARKING);
try {
yielded = yieldContinuation(); // may throw
yielded = yieldContinuation();
} catch (OutOfMemoryError e) {
// park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
@ -707,21 +693,23 @@ final class VirtualThread extends BaseVirtualThread {
if (nanos > 0) {
long startTime = System.nanoTime();
// park the thread, afterYield will schedule the thread to unpark
boolean yielded = false;
Future<?> unparker = scheduleUnpark(nanos); // may throw OOME
setParkTimeout(nanos);
setState(TIMED_PARKING);
try {
yielded = yieldContinuation(); // may throw
yielded = yieldContinuation();
} catch (OutOfMemoryError e) {
// park on carrier
} finally {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == TIMED_PARKING;
setState(RUNNING);
}
cancel(unparker);
}
// park on carrier thread for remaining time when pinned
// park on carrier thread for remaining time when pinned (or OOME)
if (!yielded) {
long remainingNanos = nanos - (System.nanoTime() - startTime);
parkOnCarrierThread(true, remainingNanos);
@ -772,38 +760,6 @@ final class VirtualThread extends BaseVirtualThread {
}
}
/**
* Schedule this virtual thread to be unparked after a given delay.
*/
@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
assert Thread.currentThread() == this;
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
return schedule(this::unpark, nanos, NANOSECONDS);
} finally {
switchToVirtualThread(this);
}
}
/**
* Cancels a task if it has not completed.
*/
@ChangesCurrentThread
private void cancel(Future<?> future) {
assert Thread.currentThread() == this;
if (!future.isDone()) {
// need to switch to current carrier thread to avoid nested parking
switchToCarrierThread();
try {
future.cancel(false);
} finally {
switchToVirtualThread(this);
}
}
}
/**
* Re-enables this virtual thread for scheduling. If this virtual thread is parked
* then its task is scheduled to continue, otherwise its next call to {@code park} or
@ -1041,10 +997,10 @@ final class VirtualThread extends BaseVirtualThread {
return Thread.State.RUNNABLE;
case PARKED:
case PINNED:
return State.WAITING;
return Thread.State.WAITING;
case TIMED_PARKED:
case TIMED_PINNED:
return State.TIMED_WAITING;
return Thread.State.TIMED_WAITING;
case TERMINATED:
return Thread.State.TERMINATED;
default:
@ -1263,6 +1219,10 @@ final class VirtualThread extends BaseVirtualThread {
}
}
private void setParkTimeout(long timeout) {
parkTimeout = timeout;
}
private void setCarrierThread(Thread carrier) {
// U.putReferenceRelease(this, CARRIER_THREAD, carrier);
this.carrierThread = carrier;
@ -1286,10 +1246,6 @@ final class VirtualThread extends BaseVirtualThread {
@JvmtiMountTransition
private native void notifyJvmtiUnmount(boolean hide);
@IntrinsicCandidate
@JvmtiMountTransition
private static native void notifyJvmtiHideFrames(boolean hide);
@IntrinsicCandidate
private static native void notifyJvmtiDisableSuspend(boolean enter);

View file

@ -51,6 +51,7 @@ import java.util.Objects;
import java.util.function.Predicate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.LockSupport;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Unsafe;
@ -2632,7 +2633,7 @@ public class ForkJoinPool extends AbstractExecutorService {
private void poolSubmit(boolean signalIfEmpty, ForkJoinTask<?> task) {
Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
if (((t = JLA.currentCarrierThread()) instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)t).pool == this) {
internal = true;
q = wt.workQueue;
@ -2643,6 +2644,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
q.push(task, signalIfEmpty ? this : null, internal);
}
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
/**
* Returns queue for an external submission, bypassing call to