8338146: Improve Exchanger performance with VirtualThreads

Reviewed-by: alanb
This commit is contained in:
Doug Lea 2024-08-21 18:22:24 +00:00
parent e297e8817f
commit ab8071d280
3 changed files with 251 additions and 329 deletions

View file

@ -139,125 +139,109 @@ public class Exchanger<V> {
* able to exchange items. That is, we cannot completely partition * able to exchange items. That is, we cannot completely partition
* across threads, but instead give threads arena indices that * across threads, but instead give threads arena indices that
* will on average grow under contention and shrink under lack of * will on average grow under contention and shrink under lack of
* contention. We approach this by defining the Nodes that we need * contention.
* anyway as ThreadLocals, and include in them per-thread index
* and related bookkeeping state. (We can safely reuse per-thread
* nodes rather than creating them fresh each time because slots
* alternate between pointing to a node vs null, so cannot
* encounter ABA problems. However, we do need some care in
* resetting them between uses.)
* *
* Implementing an effective arena requires allocating a bunch of * We approach this by defining the Nodes holding references to
* space, so we only do so upon detecting contention (except on * transfered items as ThreadLocals, and include in them
* uniprocessors, where they wouldn't help, so aren't used). * per-thread index and related bookkeeping state. We can safely
* Otherwise, exchanges use the single-slot slotExchange method. * reuse per-thread nodes rather than creating them fresh each
* On contention, not only must the slots be in different * time because slots alternate between pointing to a node vs
* locations, but the locations must not encounter memory * null, so cannot encounter ABA problems. However, we must ensure
* contention due to being on the same cache line (or more * that object transfer fields are reset between uses. Given this,
* generally, the same coherence unit). Because, as of this * Participant nodes can be defined as static ThreadLocals. As
* writing, there is no way to determine cacheline size, we define * seen for example in class Striped64, using indices established
* a value that is enough for common platforms. Additionally, * in one instance across others usually improves overall
* extra care elsewhere is taken to avoid other false/unintended * performance. Nodes also include a participant-local random
* sharing and to enhance locality, including adding padding (via * number generator.
* @Contended) to Nodes, embedding "bound" as an Exchanger field. *
* Spreading out contention requires that the memory locations
* used by the arena slots don't share a cache line -- otherwise,
* the arena would have almost no benefit. We arrange this by
* adding another level of indirection: The arena elements point
* to "Slots", each of which is padded using @Contended. We only
* create a single Slot on intialization, adding more when
* needed. The per-thread Participant Nodes may also be subject to
* false-sharing contention, but tend to be more scattered in
* memory, so are unpadded, with some occasional performance impact.
* *
* The arena starts out with only one used slot. We expand the * The arena starts out with only one used slot. We expand the
* effective arena size by tracking collisions; i.e., failed CASes * effective arena size by tracking collisions; i.e., failed CASes
* while trying to exchange. By nature of the above algorithm, the * while trying to exchange. And shrink it via "spinouts" in which
* only kinds of collision that reliably indicate contention are * threads give up waiting at a slot. By nature of the above
* when two attempted releases collide -- one of two attempted * algorithm, the only kinds of collision that reliably indicate
* offers can legitimately fail to CAS without indicating * contention are when two attempted releases collide -- one of
* contention by more than one other thread. (Note: it is possible * two attempted offers can legitimately fail to CAS without
* but not worthwhile to more precisely detect contention by * indicating contention by more than one other thread.
* reading slot values after CAS failures.) When a thread has
* collided at each slot within the current arena bound, it tries
* to expand the arena size by one. We track collisions within
* bounds by using a version (sequence) number on the "bound"
* field, and conservatively reset collision counts when a
* participant notices that bound has been updated (in either
* direction).
* *
* The effective arena size is reduced (when there is more than * Arena size (the value of field "bound") is controlled by random
* one slot) by giving up on waiting after a while and trying to * sampling. On each miss (collision or spinout), a thread chooses
* decrement the arena size on expiration. The value of "a while" * a new random index within the arena. Upon the third collision
* is an empirical matter. We implement by piggybacking on the * with the same current bound, it tries to grow the arena. And
* use of spin->yield->block that is essential for reasonable * upon the second spinout, it tries to shrink. The asymmetry in
* waiting performance anyway -- in a busy exchanger, offers are * part reflects relative costs, and reduces flailing. Because
* usually almost immediately released, in which case context * they cannot be changed without also changing the sampling
* switching on multiprocessors is extremely slow/wasteful. Arena * strategy, these rules are directly incorporated into uses of
* waits just omit the blocking part, and instead cancel. The spin * the xchg "misses" variable. The bound field is tagged with
* count is empirically chosen to be a value that avoids blocking * sequence numbers to reduce stale decisions. Uniform random
* 99% of the time under maximum sustained exchange rates on a * indices are generated using XorShift with enough bits so that
* range of test machines. Spins and yields entail some limited * bias (See Knuth TAoCP vol 2) is negligible for moduli used here
* randomness (using a cheap xorshift) to avoid regular patterns * (at most 256) without requiring rejection tests. Using
* that can induce unproductive grow/shrink cycles. (Using a * nonuniform randoms with greater weight to higher indices is
* pseudorandom also helps regularize spin cycle duration by * also possible but does not seem worthwhile in practice.
* making branches unpredictable.) Also, during an offer, a
* waiter can "know" that it will be released when its slot has
* changed, but cannot yet proceed until match is set. In the
* mean time it cannot cancel the offer, so instead spins/yields.
* Note: It is possible to avoid this secondary check by changing
* the linearization point to be a CAS of the match field (as done
* in one case in the Scott & Scherer DISC paper), which also
* increases asynchrony a bit, at the expense of poorer collision
* detection and inability to always reuse per-thread nodes. So
* the current scheme is typically a better tradeoff.
* *
* On collisions, indices traverse the arena cyclically in reverse * These mechanics rely on a reasonable choice of constant SPINS.
* order, restarting at the maximum index (which will tend to be * The time cost of SPINS * Thread.onSpinWait() should be at least
* sparsest) when bounds change. (On expirations, indices instead * the expected cost of a park/unpark context switch, and larger
* are halved until reaching 0.) It is possible (and has been * than that of two failed CASes, but still small enough to avoid
* tried) to use randomized, prime-value-stepped, or double-hash * excessive delays during arena shrinkage. We also deal with the
* style traversal instead of simple cyclic traversal to reduce * possibility that when an offering thread waits for a release,
* bunching. But empirically, whatever benefits these may have * spin-waiting would be useless because the releasing thread is
* don't overcome their added overhead: We are managing operations * descheduled. On multiprocessors, we cannot know this in
* that occur very quickly unless there is sustained contention, * general. But when Virtual Threads are used, method
* so simpler/faster control policies work better than more * ForkJoinWorkerThread.hasKnownQueuedWork serves as a guide to
* accurate but slower ones. * whether to spin or immediately block, allowing a context switch
* that may enable a releaser. Note also that when many threads
* are being run on few cores, enountering enough collisions to
* trigger arena growth is rare, and soon followed by shrinkage,
* so this doesn't require special handling.
* *
* Because we use expiration for arena size control, we cannot * The basic exchange mechanics rely on checks that Node item
* throw TimeoutExceptions in the timed version of the public * fields are not null, which doesn't work when offered items are
* exchange method until the arena size has shrunken to zero (or * null. We trap this case by translating nulls to the
* the arena isn't enabled). This may delay response to timeout * (un-Exchangeable) value of the static Participant
* but is still within spec. * reference.
* *
* Essentially all of the implementation is in methods * Essentially all of the implementation is in method xchg. As is
* slotExchange and arenaExchange. These have similar overall * too common in this sort of code, most of the logic relies on
* structure, but differ in too many details to combine. The * reads of fields that are maintained as local variables so can't
* slotExchange method uses the single Exchanger field "slot" * be nicely factored. It is structured as a main loop with a
* rather than arena array elements. However, it still needs * leading volatile read (of field bound), that causes others to
* minimal collision detection to trigger arena construction. * be freshly read even though declared in plain mode. We don't
* (The messiest part is making sure interrupt status and * use compareAndExchange that would otherwise save some re-reads
* InterruptedExceptions come out right during transitions when * because of the need to recheck indices and bounds on failures.
* both methods may be called. This is done by using null return
* as a sentinel to recheck interrupt status.)
* *
* As is too common in this sort of code, methods are monolithic * Support for optional timeouts in a single method adds further
* because most of the logic relies on reads of fields that are * complexity. Note that for the sake of arena bounds control,
* maintained as local variables so can't be nicely factored -- * time bounds must be ignored during spinouts, which may delay
* mainly, here, bulky spin->yield->block/cancel code. Note that * TimeoutExceptions (but no more so than would excessive context
* field Node.item is not declared as volatile even though it is * switching that could occur otherwise). Responses to
* read by releasing threads, because they only do so after CAS * interruption are handled similarly, postponing commitment to
* operations that must precede access, and all uses by the owning * throw InterruptedException until successfully cancelled.
* thread are otherwise acceptably ordered by other operations. *
* (Because the actual points of atomicity are slot CASes, it * Design differences from previous releases include:
* would also be legal for the write to Node.match in a release to * * Accommodation of VirtualThreads.
* be weaker than a full volatile write. However, this is not done * * Use of Slots vs spaced indices for the arena and static
* because it could allow further postponement of the write, * ThreadLocals, avoiding separate arena vs non-arena modes.
* delaying progress.) * * Use of random sampling for grow/shrink decisions, with typically
* faster and more stable adaptation (as was mentioned as a
* possible improvement in previous version).
*/ */
/**
* The index distance (as a shift value) between any two used slots
* in the arena, spacing them out to avoid false sharing.
*/
private static final int ASHIFT = 5;
/** /**
* The maximum supported arena index. The maximum allocatable * The maximum supported arena index. The maximum allocatable
* arena size is MMASK + 1. Must be a power of two minus one, less * arena size is MMASK + 1. Must be a power of two minus one. The
* than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices * cap of 255 (0xff) more than suffices for the expected scaling
* for the expected scaling limits of the main algorithms. * limits of the main algorithms.
*/ */
private static final int MMASK = 0xff; private static final int MMASK = 0xff;
@ -267,49 +251,34 @@ public class Exchanger<V> {
*/ */
private static final int SEQ = MMASK + 1; private static final int SEQ = MMASK + 1;
/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/** /**
* The maximum slot index of the arena: The number of slots that * The bound for spins while waiting for a match before either
* can in principle hold all threads without contention, or at * blocking or possibly shrinking arena.
* most the maximum indexable value.
*/
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
/**
* The bound for spins while waiting for a match. The actual
* number of iterations will on average be about twice this value
* due to randomization. Note: Spinning is disabled when NCPU==1.
*/ */
private static final int SPINS = 1 << 10; private static final int SPINS = 1 << 10;
/** /**
* Value representing null arguments/returns from public * Padded arena cells to avoid false-sharing memory contention
* methods. Needed because the API originally didn't disallow null
* arguments, which it should have.
*/ */
private static final Object NULL_ITEM = new Object(); @jdk.internal.vm.annotation.Contended
static final class Slot {
/** Node entry;
* Sentinel value returned by internal exchange methods upon }
* timeout, to avoid need for separate timed versions of these
* methods.
*/
private static final Object TIMED_OUT = new Object();
/** /**
* Nodes hold partially exchanged data, plus other per-thread * Nodes hold partially exchanged data, plus other per-thread
* bookkeeping. Padded via @Contended to reduce memory contention. * bookkeeping.
*/ */
@jdk.internal.vm.annotation.Contended static final class Node { static final class Node {
long seed; // Random seed
int index; // Arena index int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // Number of CAS failures at current bound
int hash; // Pseudo-random for spins
Object item; // This thread's current item Object item; // This thread's current item
volatile Object match; // Item provided by releasing thread volatile Object match; // Item provided by releasing thread
volatile Thread parked; // Set to this thread when parked, else null volatile Thread parked; // Set to this thread when parked, else null
Node() {
index = -1; // initialize on first use
seed = Thread.currentThread().threadId();
}
} }
/** The corresponding thread local class */ /** The corresponding thread local class */
@ -318,210 +287,152 @@ public class Exchanger<V> {
} }
/** /**
* Per-thread state. * The participant thread-locals. Because it is impossible to
* exchange, we also use this reference for dealing with null user
* arguments that are translated in and out of this value
* surrounding use.
*/ */
private final Participant participant; private static final Participant participant = new Participant();
/** /**
* Elimination array; null until enabled (within slotExchange). * Elimination array; element accesses use emulation of volatile
* Element accesses use emulation of volatile gets and CAS. * gets and CAS.
*/ */
private volatile Node[] arena; private final Slot[] arena;
/** /**
* Slot used until contention detected. * Number of cores, for sizing and spin control. Computed only
* upon construction.
*/ */
private volatile Node slot; private final int ncpu;
/** /**
* The index of the largest valid arena position, OR'ed with SEQ * The index of the largest valid arena position.
* number in high bits, incremented on each update. The initial
* update from 0 to SEQ is used to ensure that the arena array is
* constructed only once.
*/ */
private volatile int bound; private volatile int bound;
/** /**
* Exchange function when arenas enabled. See above for explanation. * Exchange function. See above for explanation.
* *
* @param item the (non-null) item to exchange * @param x the item to exchange
* @param timed true if the wait is timed * @param deadline if zero, untimed, else timeout deadline
* @param ns if timed, the maximum wait time, else 0L * @return the other thread's item
* @return the other thread's item; or null if interrupted; or * @throws InterruptedException if interrupted while waiting
* TIMED_OUT if timed and timed out * @throws TimeoutException if deadline nonzero and timed out
*/ */
private final Object arenaExchange(Object item, boolean timed, long ns) { private final V xchg(V x, long deadline)
Node[] a = arena; throws InterruptedException, TimeoutException {
Slot[] a = arena;
int alen = a.length; int alen = a.length;
Node p = participant.get(); Participant ps = participant;
for (int i = p.index;;) { // access slot at i Object item = (x == null) ? ps : x; // translate nulls
int b, m, c; Node p = ps.get();
int j = (i << ASHIFT) + ((1 << ASHIFT) - 1); int i = p.index; // if < 0, move
if (j < 0 || j >= alen) int misses = 0; // ++ on collide, -- on spinout
j = alen - 1; Object offered = null; // for cleanup
Node q = (Node)AA.getAcquire(a, j); Object v = null;
if (q != null && AA.compareAndSet(a, j, q, null)) { outer: for (;;) {
Object v = q.item; // release int b, m; Slot s; Node q;
if ((m = (b = bound) & MMASK) == 0) // volatile read
i = 0;
if (i < 0 || i > m || i >= alen || (s = a[i]) == null) {
long r = p.seed; // randomly move
r ^= r << 13; r ^= r >>> 7; r ^= r << 17; // xorShift
i = p.index = (int)((p.seed = r) % (m + 1));
}
else if ((q = s.entry) != null) { // try release
if (ENTRY.compareAndSet(s, q, null)) {
Thread w;
v = q.item;
q.match = item; q.match = item;
Thread w = q.parked; if (i == 0 && (w = q.parked) != null)
if (w != null)
LockSupport.unpark(w); LockSupport.unpark(w);
return v; break;
} }
else if (i <= (m = (b = bound) & MMASK) && q == null) { else { // collision
p.item = item; // offer int nb;
if (AA.compareAndSet(a, j, null, p)) { i = -1; // move index
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; if (b != bound) // stale
Thread t = Thread.currentThread(); // wait misses = 0;
for (int h = p.hash, spins = SPINS;;) { else if (misses <= 2) // continue sampling
Object v = p.match; ++misses;
if (v != null) { else if ((nb = (b + 1) & MMASK) < alen) {
MATCH.setRelease(p, null); misses = 0; // try to grow
p.item = null; // clear for next use if (BOUND.compareAndSet(this, b, b + 1 + SEQ) &&
p.hash = h; a[i = p.index = nb] == null)
return v; AA.compareAndSet(a, nb, null, new Slot());
}
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.threadId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
else if (AA.getAcquire(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
p.parked = t; // minimize window
if (AA.getAcquire(a, j) == p) {
if (ns == 0L)
LockSupport.park(this);
else
LockSupport.parkNanos(this, ns);
}
p.parked = null;
}
else if (AA.getAcquire(a, j) == p &&
AA.compareAndSet(a, j, p, null)) {
if (m != 0) // try to shrink
BOUND.compareAndSet(this, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
return TIMED_OUT;
break; // expired; restart
} }
} }
} }
else else { // try offer
p.item = null; // clear offer if (offered == null)
} offered = p.item = item;
else { if (ENTRY.compareAndSet(s, null, p)) {
if (p.bound != b) { // stale; reset boolean tryCancel; // true if interrupted
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!BOUND.compareAndSet(this, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
/**
* Exchange function used until arenas enabled. See above for explanation.
*
* @param item the item to exchange
* @param timed true if the wait is timed
* @param ns if timed, the maximum wait time, else 0L
* @return the other thread's item; or null if either the arena
* was enabled or the thread was interrupted before completion; or
* TIMED_OUT if timed and timed out
*/
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread(); Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck if (!(tryCancel = t.isInterrupted()) && ncpu > 1 &&
return null; (i != 0 || // check for busy VTs
(!ForkJoinWorkerThread.hasKnownQueuedWork()))) {
for (Node q;;) { for (int j = SPINS; j > 0; --j) {
if ((q = slot) != null) { if ((v = p.match) != null) {
if (SLOT.compareAndSet(this, q, null)) { MATCH.set(p, null);
Object v = q.item; break outer; // spin wait
q.match = item;
Thread w = q.parked;
if (w != null)
LockSupport.unpark(w);
return v;
} }
// create arena on contention, but continue until slot null Thread.onSpinWait();
if (NCPU > 1 && bound == 0 &&
BOUND.compareAndSet(this, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
p.item = item;
if (SLOT.compareAndSet(this, null, p))
break;
p.item = null;
} }
} }
for (long ns = 1L;;) { // block or cancel offer
// await release if ((v = p.match) != null) {
int h = p.hash; MATCH.set(p, null);
long end = timed ? System.nanoTime() + ns : 0L; break outer;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.threadId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
} }
else if (slot != p) if (i == 0 && !tryCancel &&
spins = SPINS; (deadline == 0L ||
else if (!t.isInterrupted() && arena == null && ((ns = deadline - System.nanoTime()) > 0L))) {
(!timed || (ns = end - System.nanoTime()) > 0L)) { p.parked = t; // emable unpark and recheck
p.parked = t; if (p.match == null) {
if (slot == p) { if (deadline == 0L)
if (ns == 0L)
LockSupport.park(this); LockSupport.park(this);
else else
LockSupport.parkNanos(this, ns); LockSupport.parkNanos(this, ns);
tryCancel = t.isInterrupted();
} }
p.parked = null; p.parked = null;
} }
else if (SLOT.compareAndSet(this, p, null)) { else if (ENTRY.compareAndSet(s, p, null)) { // cancel
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; offered = p.item = null;
break; if (Thread.interrupted())
throw new InterruptedException();
if (deadline != 0L && ns <= 0L)
throw new TimeoutException();
i = -1; // move and restart
if (bound != b)
misses = 0; // stale
else if (misses >= 0)
--misses; // continue sampling
else if ((b & MMASK) != 0) {
misses = 0; // try to shrink
BOUND.compareAndSet(this, b, b - 1 + SEQ);
}
continue outer;
} }
} }
MATCH.setRelease(p, null); }
}
}
if (offered != null) // cleanup
p.item = null; p.item = null;
p.hash = h; @SuppressWarnings("unchecked") V ret = (v == participant) ? null : (V)v;
return v; return ret;
} }
/** /**
* Creates a new Exchanger. * Creates a new Exchanger.
*/ */
public Exchanger() { public Exchanger() {
participant = new Participant(); int h = (ncpu = Runtime.getRuntime().availableProcessors()) >>> 1;
int size = (h == 0) ? 1 : (h > MMASK) ? MMASK + 1 : h;
(arena = new Slot[size])[0] = new Slot();
} }
/** /**
@ -557,17 +468,12 @@ public class Exchanger<V> {
* @throws InterruptedException if the current thread was * @throws InterruptedException if the current thread was
* interrupted while waiting * interrupted while waiting
*/ */
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException { public V exchange(V x) throws InterruptedException {
Object v; try {
Node[] a; return xchg(x, 0L);
Object item = (x == null) ? NULL_ITEM : x; // translate null args } catch (TimeoutException cannotHappen) {
if (((a = arena) != null || return null; // not reached
(v = slotExchange(item, false, 0L)) == null) && }
(Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
} }
/** /**
@ -612,34 +518,24 @@ public class Exchanger<V> {
* @throws TimeoutException if the specified waiting time elapses * @throws TimeoutException if the specified waiting time elapses
* before another thread enters the exchange * before another thread enters the exchange
*/ */
@SuppressWarnings("unchecked")
public V exchange(V x, long timeout, TimeUnit unit) public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException { throws InterruptedException, TimeoutException {
Object v; long d = unit.toNanos(timeout) + System.nanoTime();
Object item = (x == null) ? NULL_ITEM : x; return xchg(x, (d == 0L) ? 1L : d); // avoid zero deadline
long ns = unit.toNanos(timeout);
if ((arena != null ||
(v = slotExchange(item, true, ns)) == null) &&
(Thread.interrupted() ||
(v = arenaExchange(item, true, ns)) == null))
throw new InterruptedException();
if (v == TIMED_OUT)
throw new TimeoutException();
return (v == NULL_ITEM) ? null : (V)v;
} }
// VarHandle mechanics // VarHandle mechanics
private static final VarHandle BOUND; private static final VarHandle BOUND;
private static final VarHandle SLOT;
private static final VarHandle MATCH; private static final VarHandle MATCH;
private static final VarHandle ENTRY;
private static final VarHandle AA; private static final VarHandle AA;
static { static {
try { try {
MethodHandles.Lookup l = MethodHandles.lookup(); MethodHandles.Lookup l = MethodHandles.lookup();
BOUND = l.findVarHandle(Exchanger.class, "bound", int.class); BOUND = l.findVarHandle(Exchanger.class, "bound", int.class);
SLOT = l.findVarHandle(Exchanger.class, "slot", Node.class);
MATCH = l.findVarHandle(Node.class, "match", Object.class); MATCH = l.findVarHandle(Node.class, "match", Object.class);
AA = MethodHandles.arrayElementVarHandle(Node[].class); ENTRY = l.findVarHandle(Slot.class, "entry", Node.class);
AA = MethodHandles.arrayElementVarHandle(Slot[].class);
} catch (ReflectiveOperationException e) { } catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e); throw new ExceptionInInitializerError(e);
} }

View file

@ -39,6 +39,8 @@ import java.security.AccessController;
import java.security.AccessControlContext; import java.security.AccessControlContext;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.security.ProtectionDomain; import java.security.ProtectionDomain;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
/** /**
* A thread managed by a {@link ForkJoinPool}, which executes * A thread managed by a {@link ForkJoinPool}, which executes
@ -202,6 +204,30 @@ public class ForkJoinWorkerThread extends Thread {
} }
} }
/**
* Returns true if the current task is being executed by a
* ForkJoinWorkerThread that is momentarily known to have one or
* more queued tasks that it could execute immediately. This
* method is approximate and useful only as a heuristic indicator
* within a running task.
*
* @return true if the current task is being executed by a worker
* that has queued work
*/
static boolean hasKnownQueuedWork() {
ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue q, sq;
ForkJoinPool p; ForkJoinPool.WorkQueue[] qs; int i;
Thread c = JLA.currentCarrierThread();
return ((c instanceof ForkJoinWorkerThread) &&
(p = (wt = (ForkJoinWorkerThread)c).pool) != null &&
(q = wt.workQueue) != null &&
(i = q.source) >= 0 && // check local and current source queues
(((qs = p.queues) != null && qs.length > i &&
(sq = qs[i]) != null && sq.top - sq.base > 0) ||
q.top - q.base > 0));
}
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
/** /**
* A worker thread that has no permissions, is not a member of any * A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, uses the system class loader as * user-defined ThreadGroup, uses the system class loader as

View file

@ -426,8 +426,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
long deadline = (timed) ? System.nanoTime() + ns : 0L; long deadline = (timed) ? System.nanoTime() + ns : 0L;
boolean upc = isUniprocessor; // don't spin but later recheck boolean upc = isUniprocessor; // don't spin but later recheck
Thread w = Thread.currentThread(); Thread w = Thread.currentThread();
if (w.isVirtual()) // don't spin if (spin && ForkJoinWorkerThread.hasKnownQueuedWork())
spin = false; spin = false; // don't spin
int spins = (spin & !upc) ? SPINS : 0; // negative when may park int spins = (spin & !upc) ? SPINS : 0; // negative when may park
while ((m = item) == e) { while ((m = item) == e) {
if (spins >= 0) { if (spins >= 0) {