8304919: Implementation of Virtual Threads

Reviewed-by: lmesnik, cjplummer, psandoz, mchung, sspitsyn, jpai
This commit is contained in:
Alan Bateman 2023-04-11 05:49:54 +00:00
parent 39398075b7
commit 2586f36120
205 changed files with 1379 additions and 1342 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -29,6 +29,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -40,7 +41,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import jdk.internal.event.ThreadSleepEvent;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadPinnedEvent;
import jdk.internal.event.VirtualThreadStartEvent;
@ -58,6 +58,7 @@ import jdk.internal.vm.annotation.ForceInline;
import jdk.internal.vm.annotation.Hidden;
import jdk.internal.vm.annotation.IntrinsicCandidate;
import jdk.internal.vm.annotation.JvmtiMountTransition;
import jdk.internal.vm.annotation.ReservedStackAccess;
import sun.nio.ch.Interruptible;
import sun.security.action.GetPropertyAction;
import static java.util.concurrent.TimeUnit.*;
@ -175,7 +176,7 @@ final class VirtualThread extends BaseVirtualThread {
*/
private static class VThreadContinuation extends Continuation {
VThreadContinuation(VirtualThread vthread, Runnable task) {
super(VTHREAD_SCOPE, () -> vthread.run(task));
super(VTHREAD_SCOPE, wrap(vthread, task));
}
@Override
protected void onPinned(Continuation.Pinned reason) {
@ -184,6 +185,14 @@ final class VirtualThread extends BaseVirtualThread {
PinnedThreadPrinter.printStackTrace(System.out, printAll);
}
}
private static Runnable wrap(VirtualThread vthread, Runnable task) {
return new Runnable() {
@Hidden
public void run() {
vthread.run(task);
}
};
}
}
/**
@ -211,13 +220,13 @@ final class VirtualThread extends BaseVirtualThread {
}
// notify JVMTI before mount
notifyJvmtiMount(true, firstRun);
notifyJvmtiMount(/*hide*/true, firstRun);
try {
cont.run();
} finally {
if (cont.isDone()) {
afterTerminate(/*executed*/ true);
afterTerminate();
} else {
afterYield();
}
@ -291,7 +300,7 @@ final class VirtualThread extends BaseVirtualThread {
// first mount
mount();
notifyJvmtiMount(false, true);
notifyJvmtiMount(/*hide*/false, /*first*/true);
// emit JFR event if enabled
if (VirtualThreadStartEvent.isTurnedOn()) {
@ -319,7 +328,7 @@ final class VirtualThread extends BaseVirtualThread {
} finally {
// last unmount
notifyJvmtiUnmount(true, true);
notifyJvmtiUnmount(/*hide*/true, /*last*/true);
unmount();
// final state
@ -341,6 +350,7 @@ final class VirtualThread extends BaseVirtualThread {
* return, the current thread is the virtual thread.
*/
@ChangesCurrentThread
@ReservedStackAccess
private void mount() {
// sets the carrier thread
Thread carrier = Thread.currentCarrierThread();
@ -367,6 +377,7 @@ final class VirtualThread extends BaseVirtualThread {
* current thread is the current platform thread.
*/
@ChangesCurrentThread
@ReservedStackAccess
private void unmount() {
// set Thread.currentThread() to return the platform thread
Thread carrier = this.carrierThread;
@ -404,22 +415,37 @@ final class VirtualThread extends BaseVirtualThread {
notifyJvmtiHideFrames(false);
}
/**
* Executes the given value returning task on the current carrier thread.
*/
@ChangesCurrentThread
<V> V executeOnCarrierThread(Callable<V> task) throws Exception {
assert Thread.currentThread() == this;
switchToCarrierThread();
try {
return task.call();
} finally {
switchToVirtualThread(this);
}
}
/**
* Unmounts this virtual thread, invokes Continuation.yield, and re-mounts the
* thread when continued. When enabled, JVMTI must be notified from this method.
* @return true if the yield was successful
*/
@Hidden
@ChangesCurrentThread
private boolean yieldContinuation() {
// unmount
notifyJvmtiUnmount(true, false);
notifyJvmtiUnmount(/*hide*/true, /*last*/false);
unmount();
try {
return Continuation.yield(VTHREAD_SCOPE);
} finally {
// re-mount
mount();
notifyJvmtiMount(false, false);
notifyJvmtiMount(/*hide*/false, /*first*/false);
}
}
@ -436,7 +462,7 @@ final class VirtualThread extends BaseVirtualThread {
setState(PARKED);
// notify JVMTI that unmount has completed, thread is parked
notifyJvmtiUnmount(false, false);
notifyJvmtiUnmount(/*hide*/false, /*last*/false);
// may have been unparked while parking
if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
@ -452,7 +478,7 @@ final class VirtualThread extends BaseVirtualThread {
setState(RUNNABLE);
// notify JVMTI that unmount has completed, thread is runnable
notifyJvmtiUnmount(false, false);
notifyJvmtiUnmount(/*hide*/false, /*last*/false);
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
@ -463,17 +489,26 @@ final class VirtualThread extends BaseVirtualThread {
}
}
/**
* Invoked after the thread terminates execution. It notifies anyone
* waiting for the thread to terminate.
*/
private void afterTerminate() {
afterTerminate(true, true);
}
/**
* Invoked after the thread terminates (or start failed). This method
* notifies anyone waiting for the thread to terminate.
*
* @param notifyContainer true if its container should be notified
* @param executed true if the thread executed, false if it failed to start
*/
private void afterTerminate(boolean executed) {
private void afterTerminate(boolean notifyContainer, boolean executed) {
assert (state() == TERMINATED) && (carrierThread == null);
if (executed) {
notifyJvmtiUnmount(false, true);
notifyJvmtiUnmount(/*hide*/false, /*last*/true);
}
// notify anyone waiting for this virtual thread to terminate
@ -483,13 +518,13 @@ final class VirtualThread extends BaseVirtualThread {
termination.countDown();
}
if (executed) {
// notify container if thread executed
// notify container
if (notifyContainer) {
threadContainer().onExit(this);
// clear references to thread locals
clearReferences();
}
// clear references to thread locals
clearReferences();
}
/**
@ -506,12 +541,16 @@ final class VirtualThread extends BaseVirtualThread {
}
// bind thread to container
assert threadContainer() == null;
setThreadContainer(container);
// start thread
boolean addedToContainer = false;
boolean started = false;
container.onStart(this); // may throw
try {
container.onStart(this); // may throw
addedToContainer = true;
// scoped values may be inherited
inheritScopedValueBindings(container);
@ -521,8 +560,7 @@ final class VirtualThread extends BaseVirtualThread {
} finally {
if (!started) {
setState(TERMINATED);
container.onExit(this);
afterTerminate(/*executed*/ false);
afterTerminate(addedToContainer, /*executed*/false);
}
}
}
@ -551,14 +589,21 @@ final class VirtualThread extends BaseVirtualThread {
return;
// park the thread
boolean yielded = false;
setState(PARKING);
try {
if (!yieldContinuation()) {
// park on the carrier thread when pinned
parkOnCarrierThread(false, 0);
}
yielded = yieldContinuation(); // may throw
} finally {
assert (Thread.currentThread() == this) && (state() == RUNNING);
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
}
}
// park on the carrier thread when pinned
if (!yielded) {
parkOnCarrierThread(false, 0);
}
}
@ -582,14 +627,17 @@ final class VirtualThread extends BaseVirtualThread {
if (nanos > 0) {
long startTime = System.nanoTime();
boolean yielded;
boolean yielded = false;
Future<?> unparker = scheduleUnpark(this::unpark, nanos);
setState(PARKING);
try {
yielded = yieldContinuation();
yielded = yieldContinuation(); // may throw
} finally {
assert (Thread.currentThread() == this)
&& (state() == RUNNING || state() == PARKING);
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == PARKING;
setState(RUNNING);
}
cancel(unparker);
}
@ -611,10 +659,15 @@ final class VirtualThread extends BaseVirtualThread {
* @param nanos the waiting time in nanoseconds
*/
private void parkOnCarrierThread(boolean timed, long nanos) {
assert state() == PARKING;
assert state() == RUNNING;
var pinnedEvent = new VirtualThreadPinnedEvent();
pinnedEvent.begin();
VirtualThreadPinnedEvent event;
try {
event = new VirtualThreadPinnedEvent();
event.begin();
} catch (OutOfMemoryError e) {
event = null;
}
setState(PINNED);
try {
@ -632,7 +685,13 @@ final class VirtualThread extends BaseVirtualThread {
// consume parking permit
setParkPermit(false);
pinnedEvent.commit();
if (event != null) {
try {
event.commit();
} catch (OutOfMemoryError e) {
// ignore
}
}
}
/**
@ -707,41 +766,18 @@ final class VirtualThread extends BaseVirtualThread {
void tryYield() {
assert Thread.currentThread() == this;
setState(YIELDING);
boolean yielded = false;
try {
yieldContinuation();
yielded = yieldContinuation(); // may throw
} finally {
assert Thread.currentThread() == this;
if (state() != RUNNING) {
assert (Thread.currentThread() == this) && (yielded == (state() == RUNNING));
if (!yielded) {
assert state() == YIELDING;
setState(RUNNING);
}
}
}
/**
* Sleep the current virtual thread for the given sleep time.
*
* @param nanos the maximum number of nanoseconds to sleep
* @throws InterruptedException if interrupted while sleeping
*/
void sleepNanos(long nanos) throws InterruptedException {
assert Thread.currentThread() == this;
if (nanos >= 0) {
if (ThreadSleepEvent.isTurnedOn()) {
ThreadSleepEvent event = new ThreadSleepEvent();
try {
event.time = nanos;
event.begin();
doSleepNanos(nanos);
} finally {
event.commit();
}
} else {
doSleepNanos(nanos);
}
}
}
/**
* Sleep the current thread for the given sleep time (in nanoseconds). If
* nanos is 0 then the thread will attempt to yield.
@ -751,9 +787,12 @@ final class VirtualThread extends BaseVirtualThread {
* will consume the parking permit so this method makes available the parking
* permit after the sleep. This may be observed as a spurious, but benign,
* wakeup when the thread subsequently attempts to park.
*
* @param nanos the maximum number of nanoseconds to sleep
* @throws InterruptedException if interrupted while sleeping
*/
private void doSleepNanos(long nanos) throws InterruptedException {
assert nanos >= 0;
void sleepNanos(long nanos) throws InterruptedException {
assert Thread.currentThread() == this && nanos >= 0;
if (getAndClearInterrupt())
throw new InterruptedException();
if (nanos == 0) {