mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-27 14:54:52 +02:00
8287496: Alternative virtual thread implementation that maps to OS thread
Reviewed-by: rehn, mchung
This commit is contained in:
parent
199832a710
commit
6ff2d89ea1
72 changed files with 694 additions and 173 deletions
67
src/java.base/share/classes/java/lang/BaseVirtualThread.java
Normal file
67
src/java.base/share/classes/java/lang/BaseVirtualThread.java
Normal file
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
|
||||
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||||
*
|
||||
* This code is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License version 2 only, as
|
||||
* published by the Free Software Foundation. Oracle designates this
|
||||
* particular file as subject to the "Classpath" exception as provided
|
||||
* by Oracle in the LICENSE file that accompanied this code.
|
||||
*
|
||||
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||||
* version 2 for more details (a copy is included in the LICENSE file that
|
||||
* accompanied this code).
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License version
|
||||
* 2 along with this work; if not, write to the Free Software Foundation,
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
*
|
||||
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||||
* or visit www.oracle.com if you need additional information or have any
|
||||
* questions.
|
||||
*/
|
||||
package java.lang;
|
||||
|
||||
/**
|
||||
* Base class for virtual thread implementations.
|
||||
*/
|
||||
sealed abstract class BaseVirtualThread extends Thread
|
||||
permits VirtualThread, ThreadBuilders.BoundVirtualThread {
|
||||
|
||||
/**
|
||||
* Initializes a virtual Thread.
|
||||
*
|
||||
* @param name thread name, can be null
|
||||
* @param characteristics thread characteristics
|
||||
* @param bound true when bound to an OS thread
|
||||
*/
|
||||
BaseVirtualThread(String name, int characteristics, boolean bound) {
|
||||
super(name, characteristics, bound);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks the current virtual thread until the parking permit is available or
|
||||
* the thread is interrupted.
|
||||
*
|
||||
* The behavior of this method when the current thread is not this thread
|
||||
* is not defined.
|
||||
*/
|
||||
abstract void park();
|
||||
|
||||
/**
|
||||
* Parks current virtual thread up to the given waiting time until the parking
|
||||
* permit is available or the thread is interrupted.
|
||||
*
|
||||
* The behavior of this method when the current thread is not this thread
|
||||
* is not defined.
|
||||
*/
|
||||
abstract void parkNanos(long nanos);
|
||||
|
||||
/**
|
||||
* Makes available the parking permit to the given this virtual thread.
|
||||
*/
|
||||
abstract void unpark();
|
||||
}
|
||||
|
|
@ -2591,18 +2591,28 @@ public final class System {
|
|||
}
|
||||
|
||||
public void parkVirtualThread() {
|
||||
VirtualThread.park();
|
||||
Thread thread = Thread.currentThread();
|
||||
if (thread instanceof BaseVirtualThread vthread) {
|
||||
vthread.park();
|
||||
} else {
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
}
|
||||
|
||||
public void parkVirtualThread(long nanos) {
|
||||
VirtualThread.parkNanos(nanos);
|
||||
Thread thread = Thread.currentThread();
|
||||
if (thread instanceof BaseVirtualThread vthread) {
|
||||
vthread.parkNanos(nanos);
|
||||
} else {
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
}
|
||||
|
||||
public void unparkVirtualThread(Thread thread) {
|
||||
if (thread instanceof VirtualThread vthread) {
|
||||
if (thread instanceof BaseVirtualThread vthread) {
|
||||
vthread.unpark();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Not a virtual thread");
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,11 @@ import java.security.ProtectionDomain;
|
|||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import jdk.internal.event.ThreadSleepEvent;
|
||||
import jdk.internal.javac.PreviewFeature;
|
||||
import jdk.internal.misc.PreviewFeatures;
|
||||
|
@ -736,8 +739,9 @@ public class Thread implements Runnable {
|
|||
*
|
||||
* @param name thread name, can be null
|
||||
* @param characteristics thread characteristics
|
||||
* @param bound true when bound to an OS thread
|
||||
*/
|
||||
Thread(String name, int characteristics) {
|
||||
Thread(String name, int characteristics, boolean bound) {
|
||||
this.tid = ThreadIdentifiers.next();
|
||||
this.name = (name != null) ? name : "";
|
||||
this.inheritedAccessControlContext = Constants.NO_PERMISSIONS_ACC;
|
||||
|
@ -767,8 +771,14 @@ public class Thread implements Runnable {
|
|||
this.contextClassLoader = ClassLoader.getSystemClassLoader();
|
||||
}
|
||||
|
||||
// no additional fields
|
||||
this.holder = null;
|
||||
// create a FieldHolder object, needed when bound to an OS thread
|
||||
if (bound) {
|
||||
ThreadGroup g = Constants.VTHREAD_GROUP;
|
||||
int pri = NORM_PRIORITY;
|
||||
this.holder = new FieldHolder(g, null, -1, pri, true);
|
||||
} else {
|
||||
this.holder = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1495,8 +1505,9 @@ public class Thread implements Runnable {
|
|||
*/
|
||||
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
|
||||
public static Thread startVirtualThread(Runnable task) {
|
||||
Objects.requireNonNull(task);
|
||||
PreviewFeatures.ensureEnabled();
|
||||
var thread = new VirtualThread(null, null, 0, task);
|
||||
var thread = ThreadBuilders.newVirtualThread(null, null, 0, task);
|
||||
thread.start();
|
||||
return thread;
|
||||
}
|
||||
|
@ -1511,7 +1522,7 @@ public class Thread implements Runnable {
|
|||
*/
|
||||
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
|
||||
public final boolean isVirtual() {
|
||||
return (this instanceof VirtualThread);
|
||||
return (this instanceof BaseVirtualThread);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2617,8 +2628,10 @@ public class Thread implements Runnable {
|
|||
StackTraceElement[][] traces = dumpThreads(threads);
|
||||
Map<Thread, StackTraceElement[]> m = HashMap.newHashMap(threads.length);
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
Thread thread = threads[i];
|
||||
StackTraceElement[] stackTrace = traces[i];
|
||||
if (stackTrace != null) {
|
||||
// BoundVirtualThread objects may be in list returned by the VM
|
||||
if (!thread.isVirtual() && stackTrace != null) {
|
||||
m.put(threads[i], stackTrace);
|
||||
}
|
||||
// else terminated so we don't put it in the map
|
||||
|
@ -2688,7 +2701,11 @@ public class Thread implements Runnable {
|
|||
* Return an array of all live threads.
|
||||
*/
|
||||
static Thread[] getAllThreads() {
|
||||
return getThreads();
|
||||
Thread[] threads = getThreads();
|
||||
return Stream.of(threads)
|
||||
// BoundVirtualThread objects may be in list returned by the VM
|
||||
.filter(Predicate.not(Thread::isVirtual))
|
||||
.toArray(Thread[]::new);
|
||||
}
|
||||
|
||||
private static native StackTraceElement[][] dumpThreads(Thread[] threads);
|
||||
|
|
|
@ -30,9 +30,12 @@ import java.lang.Thread.Builder.OfVirtual;
|
|||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import jdk.internal.misc.Unsafe;
|
||||
import jdk.internal.vm.ContinuationSupport;
|
||||
|
||||
/**
|
||||
* Defines static methods to create platform and virtual thread builders.
|
||||
|
@ -133,6 +136,9 @@ class ThreadBuilders {
|
|||
private int priority;
|
||||
private long stackSize;
|
||||
|
||||
PlatformThreadBuilder() {
|
||||
}
|
||||
|
||||
@Override
|
||||
String nextThreadName() {
|
||||
String name = super.nextThreadName();
|
||||
|
@ -203,12 +209,22 @@ class ThreadBuilders {
|
|||
*/
|
||||
static final class VirtualThreadBuilder
|
||||
extends BaseThreadBuilder<OfVirtual> implements OfVirtual {
|
||||
private Executor scheduler; // set by tests
|
||||
private Executor scheduler;
|
||||
|
||||
VirtualThreadBuilder() {
|
||||
}
|
||||
|
||||
// invoked by tests
|
||||
VirtualThreadBuilder(Executor scheduler) {
|
||||
if (!ContinuationSupport.isSupported())
|
||||
throw new UnsupportedOperationException();
|
||||
this.scheduler = Objects.requireNonNull(scheduler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread unstarted(Runnable task) {
|
||||
Objects.requireNonNull(task);
|
||||
var thread = new VirtualThread(scheduler, nextThreadName(), characteristics(), task);
|
||||
var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task);
|
||||
UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
|
||||
if (uhe != null)
|
||||
thread.uncaughtExceptionHandler(uhe);
|
||||
|
@ -349,11 +365,82 @@ class ThreadBuilders {
|
|||
public Thread newThread(Runnable task) {
|
||||
Objects.requireNonNull(task);
|
||||
String name = nextThreadName();
|
||||
Thread thread = new VirtualThread(scheduler, name, characteristics(), task);
|
||||
Thread thread = newVirtualThread(scheduler, name, characteristics(), task);
|
||||
UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
|
||||
if (uhe != null)
|
||||
thread.uncaughtExceptionHandler(uhe);
|
||||
return thread;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new virtual thread to run the given task.
|
||||
*/
|
||||
static Thread newVirtualThread(Executor scheduler,
|
||||
String name,
|
||||
int characteristics,
|
||||
Runnable task) {
|
||||
if (ContinuationSupport.isSupported()) {
|
||||
return new VirtualThread(scheduler, name, characteristics, task);
|
||||
} else {
|
||||
if (scheduler != null)
|
||||
throw new UnsupportedOperationException();
|
||||
return new BoundVirtualThread(name, characteristics, task);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A "virtual thread" that is backed by a platform thread. This implementation
|
||||
* is intended for platforms that don't have the underlying VM support for
|
||||
* continuations. It can also be used for testing.
|
||||
*/
|
||||
static final class BoundVirtualThread extends BaseVirtualThread {
|
||||
private static final Unsafe U = Unsafe.getUnsafe();
|
||||
private final Runnable task;
|
||||
private boolean runInvoked;
|
||||
|
||||
BoundVirtualThread(String name, int characteristics, Runnable task) {
|
||||
super(name, characteristics, true);
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// run is specified to do nothing when Thread is a virtual thread
|
||||
if (Thread.currentThread() == this && !runInvoked) {
|
||||
runInvoked = true;
|
||||
task.run();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void park() {
|
||||
U.park(false, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
void parkNanos(long nanos) {
|
||||
U.park(false, nanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
void unpark() {
|
||||
U.unpark(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("VirtualThread[#");
|
||||
sb.append(threadId());
|
||||
String name = getName();
|
||||
if (!name.isEmpty()) {
|
||||
sb.append(",");
|
||||
sb.append(name);
|
||||
}
|
||||
sb.append("]/");
|
||||
String stateAsString = threadState().toString();
|
||||
sb.append(stateAsString.toLowerCase(Locale.ROOT));
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,7 +62,7 @@ import static java.util.concurrent.TimeUnit.*;
|
|||
* A thread that is scheduled by the Java virtual machine rather than the operating
|
||||
* system.
|
||||
*/
|
||||
final class VirtualThread extends Thread {
|
||||
final class VirtualThread extends BaseVirtualThread {
|
||||
private static final Unsafe U = Unsafe.getUnsafe();
|
||||
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
|
||||
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
|
||||
|
@ -148,7 +148,7 @@ final class VirtualThread extends Thread {
|
|||
* @param task the task to execute
|
||||
*/
|
||||
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
|
||||
super(name, characteristics);
|
||||
super(name, characteristics, /*bound*/ false);
|
||||
Objects.requireNonNull(task);
|
||||
|
||||
// choose scheduler if not specified
|
||||
|
@ -480,23 +480,15 @@ final class VirtualThread extends Thread {
|
|||
// do nothing
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks the current virtual thread until unparked or interrupted.
|
||||
*/
|
||||
static void park() {
|
||||
if (currentThread() instanceof VirtualThread vthread) {
|
||||
vthread.doPark();
|
||||
} else {
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks until unparked or interrupted. If already unparked then the parking
|
||||
* permit is consumed and this method completes immediately (meaning it doesn't
|
||||
* yield). It also completes immediately if the interrupt status is set.
|
||||
*/
|
||||
private void doPark() {
|
||||
@Override
|
||||
void park() {
|
||||
assert Thread.currentThread() == this;
|
||||
|
||||
// complete immediately if parking permit available or interrupted
|
||||
if (getAndSetParkPermit(false) || interrupted)
|
||||
return;
|
||||
|
@ -513,20 +505,6 @@ final class VirtualThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks the current virtual thread up to the given waiting time or until
|
||||
* unparked or interrupted.
|
||||
*
|
||||
* @param nanos the maximum number of nanoseconds to wait
|
||||
*/
|
||||
static void parkNanos(long nanos) {
|
||||
if (currentThread() instanceof VirtualThread vthread) {
|
||||
vthread.doParkNanos(nanos);
|
||||
} else {
|
||||
throw new WrongThreadException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parks up to the given waiting time or until unparked or interrupted.
|
||||
* If already unparked then the parking permit is consumed and this method
|
||||
|
@ -535,7 +513,8 @@ final class VirtualThread extends Thread {
|
|||
*
|
||||
* @param nanos the maximum number of nanoseconds to wait.
|
||||
*/
|
||||
private void doParkNanos(long nanos) {
|
||||
@Override
|
||||
void parkNanos(long nanos) {
|
||||
assert Thread.currentThread() == this;
|
||||
|
||||
// complete immediately if parking permit available or interrupted
|
||||
|
@ -638,6 +617,7 @@ final class VirtualThread extends Thread {
|
|||
* not to block.
|
||||
* @throws RejectedExecutionException if the scheduler cannot accept a task
|
||||
*/
|
||||
@Override
|
||||
@ChangesCurrentThread
|
||||
void unpark() {
|
||||
Thread currentThread = Thread.currentThread();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue