8296896: Change virtual Thread.yield to use external submit

Reviewed-by: jpai, rpressler
This commit is contained in:
Alan Bateman 2022-12-08 07:37:32 +00:00
parent 51759650e5
commit 1166c8e2c0
3 changed files with 176 additions and 27 deletions

View file

@ -224,45 +224,59 @@ final class VirtualThread extends BaseVirtualThread {
}
/**
* Submits the runContinuation task to the scheduler.
* @param {@code lazySubmit} to lazy submit
* Submits the runContinuation task to the scheduler. For the default scheduler,
* and calling it on a worker thread, the task will be pushed to the local queue,
* otherwise it will be pushed to a submission queue.
*
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
try {
scheduler.execute(runContinuation);
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
}
/**
* Submits the runContinuation task to the scheduler with a lazy submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
*/
private void submitRunContinuation(boolean lazySubmit) {
private void lazySubmitRunContinuation(ForkJoinPool pool) {
try {
if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
} else {
scheduler.execute(runContinuation);
}
} catch (RejectedExecutionException ree) {
// record event
submitFailed(ree);
throw ree;
}
}
/**
* Submits the runContinuation task to the scheduler as an external submit.
* @throws RejectedExecutionException
* @see ForkJoinPool#externalSubmit(ForkJoinTask)
*/
private void externalSubmitRunContinuation(ForkJoinPool pool) {
try {
pool.externalSubmit(ForkJoinTask.adapt(runContinuation));
} catch (RejectedExecutionException ree) {
submitFailed(ree);
throw ree;
}
}
/**
* If enabled, emits a JFR VirtualThreadSubmitFailedEvent.
*/
private void submitFailed(RejectedExecutionException ree) {
var event = new VirtualThreadSubmitFailedEvent();
if (event.isEnabled()) {
event.javaThreadId = threadId();
event.exceptionMessage = ree.getMessage();
event.commit();
}
throw ree;
}
}
/**
* Submits the runContinuation task to the scheduler.
* @throws RejectedExecutionException
*/
private void submitRunContinuation() {
submitRunContinuation(false);
}
/**
* Submits the runContinuation task to the scheduler and without signalling
* any threads if possible.
* @throws RejectedExecutionException
*/
private void lazySubmitRunContinuation() {
submitRunContinuation(true);
}
/**
@ -437,7 +451,12 @@ final class VirtualThread extends BaseVirtualThread {
// may have been unparked while parking
if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
// lazy submit to continue on the current thread as carrier if possible
lazySubmitRunContinuation();
if (currentThread() instanceof CarrierThread ct) {
lazySubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
}
} else if (s == YIELDING) { // Thread.yield
setState(RUNNABLE);
@ -445,8 +464,12 @@ final class VirtualThread extends BaseVirtualThread {
// notify JVMTI that unmount has completed, thread is runnable
if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);
// lazy submit to continue on the current thread as carrier if possible
lazySubmitRunContinuation();
// external submit if there are no tasks in the local task queue
if (currentThread() instanceof CarrierThread ct && ct.getQueuedTaskCount() == 0) {
externalSubmitRunContinuation(ct.getPool());
} else {
submitRunContinuation();
}
}
}

View file

@ -0,0 +1,126 @@
/*
* Copyright (c) 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/**
* @test
* @summary Test Thread.yield submits the virtual thread task to the expected queue
* @requires vm.continuations
* @enablePreview
* @run junit/othervm -Djdk.virtualThreadScheduler.maxPoolSize=1 YieldQueuing
*/
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
class YieldQueuing {
/**
* Test Thread.yield submits the task for the current virtual thread to a scheduler
* submission queue when there are no tasks in the local queue.
*/
@Test
void testYieldWithEmptyLocalQueue() throws Exception {
var list = new CopyOnWriteArrayList<String>();
var threadsStarted = new AtomicBoolean();
var threadA = Thread.ofVirtual().unstarted(() -> {
// pin thread until task for B is in submission queue
while (!threadsStarted.get()) {
Thread.onSpinWait();
}
list.add("A");
Thread.yield(); // push task for A to submission queue, B should run
list.add("A");
});
var threadB = Thread.ofVirtual().unstarted(() -> {
list.add("B");
});
// push tasks for A and B to submission queue
threadA.start();
threadB.start();
// release A
threadsStarted.set(true);
// wait for result
threadA.join();
threadB.join();
assertEquals(list, List.of("A", "B", "A"));
}
/**
* Test Thread.yield submits the task for the current virtual thread to the local
* queue when there are tasks in the local queue.
*/
@Test
void testYieldWithNonEmptyLocalQueue() throws Exception {
var list = new CopyOnWriteArrayList<String>();
var threadsStarted = new AtomicBoolean();
var threadA = Thread.ofVirtual().unstarted(() -> {
// pin thread until tasks for B and C are in submission queue
while (!threadsStarted.get()) {
Thread.onSpinWait();
}
list.add("A");
LockSupport.park(); // B should run
list.add("A");
});
var threadB = Thread.ofVirtual().unstarted(() -> {
list.add("B");
LockSupport.unpark(threadA); // push task for A to local queue
Thread.yield(); // push task for B to local queue, A should run
list.add("B");
});
var threadC = Thread.ofVirtual().unstarted(() -> {
list.add("C");
});
// push tasks for A, B and C to submission queue
threadA.start();
threadB.start();
threadC.start();
// release A
threadsStarted.set(true);
// wait for result
threadA.join();
threadB.join();
threadC.join();
assertEquals(list, List.of("A", "B", "A", "B", "C"));
}
}

View file

@ -26,7 +26,7 @@
* @summary Stress test Thread.yield
* @requires vm.debug != true
* @enablePreview
* @run main YieldALot 500000
* @run main YieldALot 350000
*/
/*