mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-28 07:14:30 +02:00
8246677: LinkedTransferQueue and SynchronousQueue synchronization updates
Reviewed-by: alanb, dl
This commit is contained in:
parent
5cfa8c94d6
commit
63e3bd7613
3 changed files with 245 additions and 396 deletions
|
@ -309,31 +309,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* 2. Await match or cancellation (method awaitMatch)
|
* 2. Await match or cancellation (method awaitMatch)
|
||||||
*
|
*
|
||||||
* Wait for another thread to match node; instead cancelling if
|
* Wait for another thread to match node; instead cancelling if
|
||||||
* the current thread was interrupted or the wait timed out. On
|
* the current thread was interrupted or the wait timed out. To
|
||||||
* multiprocessors, we use front-of-queue spinning: If a node
|
* improve performance in common single-source / single-sink
|
||||||
* appears to be the first unmatched node in the queue, it
|
* usages when there are more tasks that cores, an initial
|
||||||
* spins a bit before blocking. In either case, before blocking
|
* Thread.yield is tried when there is apparently only one
|
||||||
* it tries to unsplice any nodes between the current "head"
|
* waiter. In other cases, waiters may help with some
|
||||||
* and the first unmatched node.
|
* bookkeeping, then park/unpark.
|
||||||
*
|
|
||||||
* Front-of-queue spinning vastly improves performance of
|
|
||||||
* heavily contended queues. And so long as it is relatively
|
|
||||||
* brief and "quiet", spinning does not much impact performance
|
|
||||||
* of less-contended queues. During spins threads check their
|
|
||||||
* interrupt status and generate a thread-local random number
|
|
||||||
* to decide to occasionally perform a Thread.yield. While
|
|
||||||
* yield has underdefined specs, we assume that it might help,
|
|
||||||
* and will not hurt, in limiting impact of spinning on busy
|
|
||||||
* systems. We also use smaller (1/2) spins for nodes that are
|
|
||||||
* not known to be front but whose predecessors have not
|
|
||||||
* blocked -- these "chained" spins avoid artifacts of
|
|
||||||
* front-of-queue rules which otherwise lead to alternating
|
|
||||||
* nodes spinning vs blocking. Further, front threads that
|
|
||||||
* represent phase changes (from data to request node or vice
|
|
||||||
* versa) compared to their predecessors receive additional
|
|
||||||
* chained spins, reflecting longer paths typically required to
|
|
||||||
* unblock threads during phase changes.
|
|
||||||
*
|
|
||||||
*
|
*
|
||||||
* ** Unlinking removed interior nodes **
|
* ** Unlinking removed interior nodes **
|
||||||
*
|
*
|
||||||
|
@ -369,30 +350,9 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
*
|
*
|
||||||
* When these cases arise, rather than always retraversing the
|
* When these cases arise, rather than always retraversing the
|
||||||
* entire list to find an actual predecessor to unlink (which
|
* entire list to find an actual predecessor to unlink (which
|
||||||
* won't help for case (1) anyway), we record a conservative
|
* won't help for case (1) anyway), we record the need to sweep the
|
||||||
* estimate of possible unsplice failures (in "sweepVotes").
|
* next time any thread would otherwise block in awaitMatch. Also,
|
||||||
* We trigger a full sweep when the estimate exceeds a threshold
|
* because traversal operations on the linked list of nodes are a
|
||||||
* ("SWEEP_THRESHOLD") indicating the maximum number of estimated
|
|
||||||
* removal failures to tolerate before sweeping through, unlinking
|
|
||||||
* cancelled nodes that were not unlinked upon initial removal.
|
|
||||||
* We perform sweeps by the thread hitting threshold (rather than
|
|
||||||
* background threads or by spreading work to other threads)
|
|
||||||
* because in the main contexts in which removal occurs, the
|
|
||||||
* caller is timed-out or cancelled, which are not time-critical
|
|
||||||
* enough to warrant the overhead that alternatives would impose
|
|
||||||
* on other threads.
|
|
||||||
*
|
|
||||||
* Because the sweepVotes estimate is conservative, and because
|
|
||||||
* nodes become unlinked "naturally" as they fall off the head of
|
|
||||||
* the queue, and because we allow votes to accumulate even while
|
|
||||||
* sweeps are in progress, there are typically significantly fewer
|
|
||||||
* such nodes than estimated. Choice of a threshold value
|
|
||||||
* balances the likelihood of wasted effort and contention, versus
|
|
||||||
* providing a worst-case bound on retention of interior nodes in
|
|
||||||
* quiescent queues. The value defined below was chosen
|
|
||||||
* empirically to balance these under various timeout scenarios.
|
|
||||||
*
|
|
||||||
* Because traversal operations on the linked list of nodes are a
|
|
||||||
* natural opportunity to sweep dead nodes, we generally do so,
|
* natural opportunity to sweep dead nodes, we generally do so,
|
||||||
* including all the operations that might remove elements as they
|
* including all the operations that might remove elements as they
|
||||||
* traverse, such as removeIf and Iterator.remove. This largely
|
* traverse, such as removeIf and Iterator.remove. This largely
|
||||||
|
@ -405,28 +365,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* self-linked.
|
* self-linked.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/** True if on multiprocessor */
|
|
||||||
private static final boolean MP =
|
|
||||||
Runtime.getRuntime().availableProcessors() > 1;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of times to spin (with randomly interspersed calls
|
* The number of nanoseconds for which it is faster to spin
|
||||||
* to Thread.yield) on multiprocessor before blocking when a node
|
* rather than to use timed park. A rough estimate suffices.
|
||||||
* is apparently the first waiter in the queue. See above for
|
* Using a power of two minus one simplifies some comparisons.
|
||||||
* explanation. Must be a power of two. The value is empirically
|
|
||||||
* derived -- it works pretty well across a variety of processors,
|
|
||||||
* numbers of CPUs, and OSes.
|
|
||||||
*/
|
*/
|
||||||
private static final int FRONT_SPINS = 1 << 7;
|
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of times to spin before blocking when a node is
|
|
||||||
* preceded by another node that is apparently spinning. Also
|
|
||||||
* serves as an increment to FRONT_SPINS on phase changes, and as
|
|
||||||
* base average frequency for yielding during spins. Must be a
|
|
||||||
* power of two.
|
|
||||||
*/
|
|
||||||
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of estimated removal failures (sweepVotes)
|
* The maximum number of estimated removal failures (sweepVotes)
|
||||||
|
@ -442,7 +386,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* them after use. Writes that are intrinsically ordered wrt
|
* them after use. Writes that are intrinsically ordered wrt
|
||||||
* other accesses or CASes use simple relaxed forms.
|
* other accesses or CASes use simple relaxed forms.
|
||||||
*/
|
*/
|
||||||
static final class Node {
|
static final class Node implements ForkJoinPool.ManagedBlocker {
|
||||||
final boolean isData; // false if this is a request node
|
final boolean isData; // false if this is a request node
|
||||||
volatile Object item; // initially non-null if isData; CASed to match
|
volatile Object item; // initially non-null if isData; CASed to match
|
||||||
volatile Node next;
|
volatile Node next;
|
||||||
|
@ -487,24 +431,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
final void appendRelaxed(Node next) {
|
final void appendRelaxed(Node next) {
|
||||||
// assert next != null;
|
// assert next != null;
|
||||||
// assert this.next == null;
|
// assert this.next == null;
|
||||||
NEXT.set(this, next);
|
NEXT.setOpaque(this, next);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets item (of a request node) to self and waiter to null,
|
|
||||||
* to avoid garbage retention after matching or cancelling.
|
|
||||||
* Uses relaxed writes because order is already constrained in
|
|
||||||
* the only calling contexts: item is forgotten only after
|
|
||||||
* volatile/atomic mechanics that extract items, and visitors
|
|
||||||
* of request nodes only ever check whether item is null.
|
|
||||||
* Similarly, clearing waiter follows either CAS or return
|
|
||||||
* from park (if ever parked; else we don't care).
|
|
||||||
*/
|
|
||||||
final void forgetContents() {
|
|
||||||
// assert isMatched();
|
|
||||||
if (!isData)
|
|
||||||
ITEM.set(this, this);
|
|
||||||
WAITER.set(this, null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -534,6 +461,16 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
return d != haveData && d != (item == null);
|
return d != haveData && d != (item == null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final boolean isReleasable() {
|
||||||
|
return (isData == (item == null)) ||
|
||||||
|
Thread.currentThread().isInterrupted();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean block() {
|
||||||
|
while (!isReleasable()) LockSupport.park();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
private static final long serialVersionUID = -3375979862319811754L;
|
private static final long serialVersionUID = -3375979862319811754L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -566,7 +503,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
private transient volatile Node tail;
|
private transient volatile Node tail;
|
||||||
|
|
||||||
/** The number of apparent failures to unsplice cancelled nodes */
|
/** The number of apparent failures to unsplice cancelled nodes */
|
||||||
private transient volatile int sweepVotes;
|
private transient volatile boolean needSweep;
|
||||||
|
|
||||||
private boolean casTail(Node cmp, Node val) {
|
private boolean casTail(Node cmp, Node val) {
|
||||||
// assert cmp != null;
|
// assert cmp != null;
|
||||||
|
@ -578,11 +515,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
return HEAD.compareAndSet(this, cmp, val);
|
return HEAD.compareAndSet(this, cmp, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Atomic version of ++sweepVotes. */
|
|
||||||
private int incSweepVotes() {
|
|
||||||
return (int) SWEEPVOTES.getAndAdd(this, 1) + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to CAS pred.next (or head, if pred is null) from c to p.
|
* Tries to CAS pred.next (or head, if pred is null) from c to p.
|
||||||
* Caller must ensure that we're not unlinking the trailing node.
|
* Caller must ensure that we're not unlinking the trailing node.
|
||||||
|
@ -689,7 +621,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Spins/yields/blocks until node s is matched or caller gives up.
|
* Possibly blocks until node s is matched or caller gives up.
|
||||||
*
|
*
|
||||||
* @param s the waiting node
|
* @param s the waiting node
|
||||||
* @param pred the predecessor of s, or null if unknown (the null
|
* @param pred the predecessor of s, or null if unknown (the null
|
||||||
|
@ -700,65 +632,55 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @param nanos timeout in nanosecs, used only if timed is true
|
* @param nanos timeout in nanosecs, used only if timed is true
|
||||||
* @return matched item, or e if unmatched on interrupt or timeout
|
* @return matched item, or e if unmatched on interrupt or timeout
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
|
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
|
||||||
|
final boolean isData = s.isData;
|
||||||
final long deadline = timed ? System.nanoTime() + nanos : 0L;
|
final long deadline = timed ? System.nanoTime() + nanos : 0L;
|
||||||
Thread w = Thread.currentThread();
|
final Thread w = Thread.currentThread();
|
||||||
int spins = -1; // initialized after first item and cancel checks
|
int stat = -1; // -1: may yield, +1: park, else 0
|
||||||
ThreadLocalRandom randomYields = null; // bound if needed
|
Object item;
|
||||||
|
while ((item = s.item) == e) {
|
||||||
for (;;) {
|
if (needSweep) // help clean
|
||||||
final Object item;
|
sweep();
|
||||||
if ((item = s.item) != e) { // matched
|
else if ((timed && nanos <= 0L) || w.isInterrupted()) {
|
||||||
// assert item != s;
|
if (s.casItem(e, (e == null) ? s : null)) {
|
||||||
s.forgetContents(); // avoid garbage
|
unsplice(pred, s); // cancelled
|
||||||
@SuppressWarnings("unchecked") E itemE = (E) item;
|
|
||||||
return itemE;
|
|
||||||
}
|
|
||||||
else if (w.isInterrupted() || (timed && nanos <= 0L)) {
|
|
||||||
// try to cancel and unlink
|
|
||||||
if (s.casItem(e, s.isData ? null : s)) {
|
|
||||||
unsplice(pred, s);
|
|
||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
// return normally if lost CAS
|
|
||||||
}
|
}
|
||||||
else if (spins < 0) { // establish spins at/near front
|
else if (stat <= 0) {
|
||||||
if ((spins = spinsFor(pred, s.isData)) > 0)
|
if (pred != null && pred.next == s) {
|
||||||
randomYields = ThreadLocalRandom.current();
|
if (stat < 0 &&
|
||||||
|
(pred.isData != isData || pred.isMatched())) {
|
||||||
|
stat = 0; // yield once if first
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
stat = 1;
|
||||||
|
s.waiter = w; // enable unpark
|
||||||
|
}
|
||||||
|
} // else signal in progress
|
||||||
}
|
}
|
||||||
else if (spins > 0) { // spin
|
else if ((item = s.item) != e)
|
||||||
--spins;
|
break; // recheck
|
||||||
if (randomYields.nextInt(CHAINED_SPINS) == 0)
|
else if (!timed) {
|
||||||
Thread.yield(); // occasionally yield
|
LockSupport.setCurrentBlocker(this);
|
||||||
}
|
try {
|
||||||
else if (s.waiter == null) {
|
ForkJoinPool.managedBlock(s);
|
||||||
s.waiter = w; // request unpark then recheck
|
} catch (InterruptedException cannotHappen) { }
|
||||||
}
|
LockSupport.setCurrentBlocker(null);
|
||||||
else if (timed) {
|
|
||||||
nanos = deadline - System.nanoTime();
|
|
||||||
if (nanos > 0L)
|
|
||||||
LockSupport.parkNanos(this, nanos);
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
LockSupport.park(this);
|
nanos = deadline - System.nanoTime();
|
||||||
|
if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
|
||||||
|
LockSupport.parkNanos(this, nanos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
if (stat == 1)
|
||||||
|
WAITER.set(s, null);
|
||||||
/**
|
if (!isData)
|
||||||
* Returns spin/yield value for a node with given predecessor and
|
ITEM.set(s, s); // self-link to avoid garbage
|
||||||
* data mode. See above for explanation.
|
return (E) item;
|
||||||
*/
|
|
||||||
private static int spinsFor(Node pred, boolean haveData) {
|
|
||||||
if (MP && pred != null) {
|
|
||||||
if (pred.isData != haveData) // phase change
|
|
||||||
return FRONT_SPINS + CHAINED_SPINS;
|
|
||||||
if (pred.isMatched()) // probably at front
|
|
||||||
return FRONT_SPINS;
|
|
||||||
if (pred.waiter == null) // pred apparently spinning
|
|
||||||
return CHAINED_SPINS;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* -------------- Traversal methods -------------- */
|
/* -------------- Traversal methods -------------- */
|
||||||
|
@ -1181,8 +1103,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* See above for rationale. Briefly: if pred still points to
|
* See above for rationale. Briefly: if pred still points to
|
||||||
* s, try to unlink s. If s cannot be unlinked, because it is
|
* s, try to unlink s. If s cannot be unlinked, because it is
|
||||||
* trailing node or pred might be unlinked, and neither pred
|
* trailing node or pred might be unlinked, and neither pred
|
||||||
* nor s are head or offlist, add to sweepVotes, and if enough
|
* nor s are head or offlist, set needSweep;
|
||||||
* votes have accumulated, sweep.
|
|
||||||
*/
|
*/
|
||||||
if (pred != null && pred.next == s) {
|
if (pred != null && pred.next == s) {
|
||||||
Node n = s.next;
|
Node n = s.next;
|
||||||
|
@ -1200,10 +1121,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
if (hn != h && casHead(h, hn))
|
if (hn != h && casHead(h, hn))
|
||||||
h.selfLink(); // advance head
|
h.selfLink(); // advance head
|
||||||
}
|
}
|
||||||
// sweep every SWEEP_THRESHOLD votes
|
if (pred.next != pred && s.next != s)
|
||||||
if (pred.next != pred && s.next != s // recheck if offlist
|
needSweep = true;
|
||||||
&& (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0)
|
|
||||||
sweep();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1213,6 +1132,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* traversal from head.
|
* traversal from head.
|
||||||
*/
|
*/
|
||||||
private void sweep() {
|
private void sweep() {
|
||||||
|
needSweep = false;
|
||||||
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
|
for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
|
||||||
if (!s.isMatched())
|
if (!s.isMatched())
|
||||||
// Unmatched nodes are never self-linked
|
// Unmatched nodes are never self-linked
|
||||||
|
@ -1265,7 +1185,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public void put(E e) {
|
public void put(E e) {
|
||||||
xfer(e, true, ASYNC, 0);
|
xfer(e, true, ASYNC, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1278,7 +1198,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public boolean offer(E e, long timeout, TimeUnit unit) {
|
public boolean offer(E e, long timeout, TimeUnit unit) {
|
||||||
xfer(e, true, ASYNC, 0);
|
xfer(e, true, ASYNC, 0L);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1290,7 +1210,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public boolean offer(E e) {
|
public boolean offer(E e) {
|
||||||
xfer(e, true, ASYNC, 0);
|
xfer(e, true, ASYNC, 0L);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1303,7 +1223,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public boolean add(E e) {
|
public boolean add(E e) {
|
||||||
xfer(e, true, ASYNC, 0);
|
xfer(e, true, ASYNC, 0L);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1318,7 +1238,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public boolean tryTransfer(E e) {
|
public boolean tryTransfer(E e) {
|
||||||
return xfer(e, true, NOW, 0) == null;
|
return xfer(e, true, NOW, 0L) == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1333,7 +1253,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
* @throws NullPointerException if the specified element is null
|
* @throws NullPointerException if the specified element is null
|
||||||
*/
|
*/
|
||||||
public void transfer(E e) throws InterruptedException {
|
public void transfer(E e) throws InterruptedException {
|
||||||
if (xfer(e, true, SYNC, 0) != null) {
|
if (xfer(e, true, SYNC, 0L) != null) {
|
||||||
Thread.interrupted(); // failure possible only due to interrupt
|
Thread.interrupted(); // failure possible only due to interrupt
|
||||||
throw new InterruptedException();
|
throw new InterruptedException();
|
||||||
}
|
}
|
||||||
|
@ -1363,7 +1283,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
}
|
}
|
||||||
|
|
||||||
public E take() throws InterruptedException {
|
public E take() throws InterruptedException {
|
||||||
E e = xfer(null, false, SYNC, 0);
|
E e = xfer(null, false, SYNC, 0L);
|
||||||
if (e != null)
|
if (e != null)
|
||||||
return e;
|
return e;
|
||||||
Thread.interrupted();
|
Thread.interrupted();
|
||||||
|
@ -1378,7 +1298,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
}
|
}
|
||||||
|
|
||||||
public E poll() {
|
public E poll() {
|
||||||
return xfer(null, false, NOW, 0);
|
return xfer(null, false, NOW, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1722,7 +1642,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
// VarHandle mechanics
|
// VarHandle mechanics
|
||||||
private static final VarHandle HEAD;
|
private static final VarHandle HEAD;
|
||||||
private static final VarHandle TAIL;
|
private static final VarHandle TAIL;
|
||||||
private static final VarHandle SWEEPVOTES;
|
|
||||||
static final VarHandle ITEM;
|
static final VarHandle ITEM;
|
||||||
static final VarHandle NEXT;
|
static final VarHandle NEXT;
|
||||||
static final VarHandle WAITER;
|
static final VarHandle WAITER;
|
||||||
|
@ -1733,8 +1652,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
|
||||||
Node.class);
|
Node.class);
|
||||||
TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
|
TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail",
|
||||||
Node.class);
|
Node.class);
|
||||||
SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes",
|
|
||||||
int.class);
|
|
||||||
ITEM = l.findVarHandle(Node.class, "item", Object.class);
|
ITEM = l.findVarHandle(Node.class, "item", Object.class);
|
||||||
NEXT = l.findVarHandle(Node.class, "next", Node.class);
|
NEXT = l.findVarHandle(Node.class, "next", Node.class);
|
||||||
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
|
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class);
|
||||||
|
|
|
@ -166,6 +166,18 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
* old head pointers), but references in Queue nodes must be
|
* old head pointers), but references in Queue nodes must be
|
||||||
* aggressively forgotten to avoid reachability of everything any
|
* aggressively forgotten to avoid reachability of everything any
|
||||||
* node has ever referred to since arrival.
|
* node has ever referred to since arrival.
|
||||||
|
*
|
||||||
|
* The above steps improve throughput when many threads produce
|
||||||
|
* and/or consume data. But they don't help much with
|
||||||
|
* single-source / single-sink usages in which one side or the
|
||||||
|
* other is always transiently blocked, and so throughput is
|
||||||
|
* mainly a function of thread scheduling. This is not usually
|
||||||
|
* noticeably improved with bounded short spin-waits. Instead both
|
||||||
|
* forms of transfer try Thread.yield if apparently the sole
|
||||||
|
* waiter. This works well when there are more tasks that cores,
|
||||||
|
* which is expected to be the main usage context of this mode. In
|
||||||
|
* other cases, waiters may help with some bookkeeping, then
|
||||||
|
* park/unpark.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -188,28 +200,11 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
abstract E transfer(E e, boolean timed, long nanos);
|
abstract E transfer(E e, boolean timed, long nanos);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of times to spin before blocking in timed waits.
|
|
||||||
* The value is empirically derived -- it works well across a
|
|
||||||
* variety of processors and OSes. Empirically, the best value
|
|
||||||
* seems not to vary with number of CPUs (beyond 2) so is just
|
|
||||||
* a constant.
|
|
||||||
*/
|
|
||||||
static final int MAX_TIMED_SPINS =
|
|
||||||
(Runtime.getRuntime().availableProcessors() < 2) ? 0 : 32;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The number of times to spin before blocking in untimed waits.
|
|
||||||
* This is greater than timed value because untimed waits spin
|
|
||||||
* faster since they don't need to check times on each spin.
|
|
||||||
*/
|
|
||||||
static final int MAX_UNTIMED_SPINS = MAX_TIMED_SPINS * 16;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The number of nanoseconds for which it is faster to spin
|
* The number of nanoseconds for which it is faster to spin
|
||||||
* rather than to use timed park. A rough estimate suffices.
|
* rather than to use timed park. A rough estimate suffices.
|
||||||
*/
|
*/
|
||||||
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;
|
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L;
|
||||||
|
|
||||||
/** Dual stack */
|
/** Dual stack */
|
||||||
static final class TransferStack<E> extends Transferer<E> {
|
static final class TransferStack<E> extends Transferer<E> {
|
||||||
|
@ -233,7 +228,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
|
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
|
||||||
|
|
||||||
/** Node class for TransferStacks. */
|
/** Node class for TransferStacks. */
|
||||||
static final class SNode {
|
static final class SNode implements ForkJoinPool.ManagedBlocker {
|
||||||
volatile SNode next; // next node in stack
|
volatile SNode next; // next node in stack
|
||||||
volatile SNode match; // the node matched to this
|
volatile SNode match; // the node matched to this
|
||||||
volatile Thread waiter; // to control park/unpark
|
volatile Thread waiter; // to control park/unpark
|
||||||
|
@ -261,37 +256,53 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
* @return true if successfully matched to s
|
* @return true if successfully matched to s
|
||||||
*/
|
*/
|
||||||
boolean tryMatch(SNode s) {
|
boolean tryMatch(SNode s) {
|
||||||
if (match == null &&
|
SNode m; Thread w;
|
||||||
SMATCH.compareAndSet(this, null, s)) {
|
if ((m = match) == null) {
|
||||||
Thread w = waiter;
|
if (SMATCH.compareAndSet(this, null, s)) {
|
||||||
if (w != null) { // waiters need at most one unpark
|
if ((w = waiter) != null)
|
||||||
waiter = null;
|
LockSupport.unpark(w);
|
||||||
LockSupport.unpark(w);
|
return true;
|
||||||
}
|
}
|
||||||
return true;
|
else
|
||||||
|
m = match;
|
||||||
}
|
}
|
||||||
return match == s;
|
return m == s;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to cancel a wait by matching node to itself.
|
* Tries to cancel a wait by matching node to itself.
|
||||||
*/
|
*/
|
||||||
void tryCancel() {
|
boolean tryCancel() {
|
||||||
SMATCH.compareAndSet(this, null, this);
|
return SMATCH.compareAndSet(this, null, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isCancelled() {
|
boolean isCancelled() {
|
||||||
return match == this;
|
return match == this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public final boolean isReleasable() {
|
||||||
|
return match != null || Thread.currentThread().isInterrupted();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean block() {
|
||||||
|
while (!isReleasable()) LockSupport.park();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void forgetWaiter() {
|
||||||
|
SWAITER.setOpaque(this, null);
|
||||||
|
}
|
||||||
|
|
||||||
// VarHandle mechanics
|
// VarHandle mechanics
|
||||||
private static final VarHandle SMATCH;
|
private static final VarHandle SMATCH;
|
||||||
private static final VarHandle SNEXT;
|
private static final VarHandle SNEXT;
|
||||||
|
private static final VarHandle SWAITER;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
MethodHandles.Lookup l = MethodHandles.lookup();
|
MethodHandles.Lookup l = MethodHandles.lookup();
|
||||||
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
|
SMATCH = l.findVarHandle(SNode.class, "match", SNode.class);
|
||||||
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
|
SNEXT = l.findVarHandle(SNode.class, "next", SNode.class);
|
||||||
|
SWAITER = l.findVarHandle(SNode.class, "waiter", Thread.class);
|
||||||
} catch (ReflectiveOperationException e) {
|
} catch (ReflectiveOperationException e) {
|
||||||
throw new ExceptionInInitializerError(e);
|
throw new ExceptionInInitializerError(e);
|
||||||
}
|
}
|
||||||
|
@ -358,14 +369,43 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
else
|
else
|
||||||
return null;
|
return null;
|
||||||
} else if (casHead(h, s = snode(s, e, h, mode))) {
|
} else if (casHead(h, s = snode(s, e, h, mode))) {
|
||||||
SNode m = awaitFulfill(s, timed, nanos);
|
long deadline = timed ? System.nanoTime() + nanos : 0L;
|
||||||
if (m == s) { // wait was cancelled
|
Thread w = Thread.currentThread();
|
||||||
clean(s);
|
int stat = -1; // -1: may yield, +1: park, else 0
|
||||||
return null;
|
SNode m; // await fulfill or cancel
|
||||||
|
while ((m = s.match) == null) {
|
||||||
|
if ((timed &&
|
||||||
|
(nanos = deadline - System.nanoTime()) <= 0) ||
|
||||||
|
w.isInterrupted()) {
|
||||||
|
if (s.tryCancel()) {
|
||||||
|
clean(s); // wait cancelled
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else if ((m = s.match) != null) {
|
||||||
|
break; // recheck
|
||||||
|
} else if (stat <= 0) {
|
||||||
|
if (stat < 0 && h == null && head == s) {
|
||||||
|
stat = 0; // yield once if was empty
|
||||||
|
Thread.yield();
|
||||||
|
} else {
|
||||||
|
stat = 1;
|
||||||
|
s.waiter = w; // enable signal
|
||||||
|
}
|
||||||
|
} else if (!timed) {
|
||||||
|
LockSupport.setCurrentBlocker(this);
|
||||||
|
try {
|
||||||
|
ForkJoinPool.managedBlock(s);
|
||||||
|
} catch (InterruptedException cannotHappen) { }
|
||||||
|
LockSupport.setCurrentBlocker(null);
|
||||||
|
} else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
|
||||||
|
LockSupport.parkNanos(this, nanos);
|
||||||
}
|
}
|
||||||
if ((h = head) != null && h.next == s)
|
if (stat == 1)
|
||||||
casHead(h, s.next); // help s's fulfiller
|
s.forgetWaiter();
|
||||||
return (E) ((mode == REQUEST) ? m.item : s.item);
|
Object result = (mode == REQUEST) ? m.item : s.item;
|
||||||
|
if (h != null && h.next == s)
|
||||||
|
casHead(h, s.next); // help fulfiller
|
||||||
|
return (E) result;
|
||||||
}
|
}
|
||||||
} else if (!isFulfilling(h.mode)) { // try to fulfill
|
} else if (!isFulfilling(h.mode)) { // try to fulfill
|
||||||
if (h.isCancelled()) // already cancelled
|
if (h.isCancelled()) // already cancelled
|
||||||
|
@ -401,83 +441,12 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Spins/blocks until node s is matched by a fulfill operation.
|
|
||||||
*
|
|
||||||
* @param s the waiting node
|
|
||||||
* @param timed true if timed wait
|
|
||||||
* @param nanos timeout value
|
|
||||||
* @return matched node, or s if cancelled
|
|
||||||
*/
|
|
||||||
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
|
|
||||||
/*
|
|
||||||
* When a node/thread is about to block, it sets its waiter
|
|
||||||
* field and then rechecks state at least one more time
|
|
||||||
* before actually parking, thus covering race vs
|
|
||||||
* fulfiller noticing that waiter is non-null so should be
|
|
||||||
* woken.
|
|
||||||
*
|
|
||||||
* When invoked by nodes that appear at the point of call
|
|
||||||
* to be at the head of the stack, calls to park are
|
|
||||||
* preceded by spins to avoid blocking when producers and
|
|
||||||
* consumers are arriving very close in time. This can
|
|
||||||
* happen enough to bother only on multiprocessors.
|
|
||||||
*
|
|
||||||
* The order of checks for returning out of main loop
|
|
||||||
* reflects fact that interrupts have precedence over
|
|
||||||
* normal returns, which have precedence over
|
|
||||||
* timeouts. (So, on timeout, one last check for match is
|
|
||||||
* done before giving up.) Except that calls from untimed
|
|
||||||
* SynchronousQueue.{poll/offer} don't check interrupts
|
|
||||||
* and don't wait at all, so are trapped in transfer
|
|
||||||
* method rather than calling awaitFulfill.
|
|
||||||
*/
|
|
||||||
final long deadline = timed ? System.nanoTime() + nanos : 0L;
|
|
||||||
Thread w = Thread.currentThread();
|
|
||||||
int spins = shouldSpin(s)
|
|
||||||
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
|
|
||||||
: 0;
|
|
||||||
for (;;) {
|
|
||||||
if (w.isInterrupted())
|
|
||||||
s.tryCancel();
|
|
||||||
SNode m = s.match;
|
|
||||||
if (m != null)
|
|
||||||
return m;
|
|
||||||
if (timed) {
|
|
||||||
nanos = deadline - System.nanoTime();
|
|
||||||
if (nanos <= 0L) {
|
|
||||||
s.tryCancel();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (spins > 0) {
|
|
||||||
Thread.onSpinWait();
|
|
||||||
spins = shouldSpin(s) ? (spins - 1) : 0;
|
|
||||||
}
|
|
||||||
else if (s.waiter == null)
|
|
||||||
s.waiter = w; // establish waiter so can park next iter
|
|
||||||
else if (!timed)
|
|
||||||
LockSupport.park(this);
|
|
||||||
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
|
|
||||||
LockSupport.parkNanos(this, nanos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true if node s is at head or there is an active
|
|
||||||
* fulfiller.
|
|
||||||
*/
|
|
||||||
boolean shouldSpin(SNode s) {
|
|
||||||
SNode h = head;
|
|
||||||
return (h == s || h == null || isFulfilling(h.mode));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unlinks s from the stack.
|
* Unlinks s from the stack.
|
||||||
*/
|
*/
|
||||||
void clean(SNode s) {
|
void clean(SNode s) {
|
||||||
s.item = null; // forget item
|
s.item = null; // forget item
|
||||||
s.waiter = null; // forget thread
|
s.forgetWaiter();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* At worst we may need to traverse entire stack to unlink
|
* At worst we may need to traverse entire stack to unlink
|
||||||
|
@ -533,7 +502,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/** Node class for TransferQueue. */
|
/** Node class for TransferQueue. */
|
||||||
static final class QNode {
|
static final class QNode implements ForkJoinPool.ManagedBlocker {
|
||||||
volatile QNode next; // next node in queue
|
volatile QNode next; // next node in queue
|
||||||
volatile Object item; // CAS'ed to or from null
|
volatile Object item; // CAS'ed to or from null
|
||||||
volatile Thread waiter; // to control park/unpark
|
volatile Thread waiter; // to control park/unpark
|
||||||
|
@ -557,8 +526,8 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
/**
|
/**
|
||||||
* Tries to cancel by CAS'ing ref to this as item.
|
* Tries to cancel by CAS'ing ref to this as item.
|
||||||
*/
|
*/
|
||||||
void tryCancel(Object cmp) {
|
boolean tryCancel(Object cmp) {
|
||||||
QITEM.compareAndSet(this, cmp, this);
|
return QITEM.compareAndSet(this, cmp, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isCancelled() {
|
boolean isCancelled() {
|
||||||
|
@ -574,14 +543,36 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
return next == this;
|
return next == this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void forgetWaiter() {
|
||||||
|
QWAITER.setOpaque(this, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isFulfilled() {
|
||||||
|
Object x;
|
||||||
|
return isData == ((x = item) == null) || x == this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean isReleasable() {
|
||||||
|
Object x;
|
||||||
|
return isData == ((x = item) == null) || x == this ||
|
||||||
|
Thread.currentThread().isInterrupted();
|
||||||
|
}
|
||||||
|
|
||||||
|
public final boolean block() {
|
||||||
|
while (!isReleasable()) LockSupport.park();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// VarHandle mechanics
|
// VarHandle mechanics
|
||||||
private static final VarHandle QITEM;
|
private static final VarHandle QITEM;
|
||||||
private static final VarHandle QNEXT;
|
private static final VarHandle QNEXT;
|
||||||
|
private static final VarHandle QWAITER;
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
MethodHandles.Lookup l = MethodHandles.lookup();
|
MethodHandles.Lookup l = MethodHandles.lookup();
|
||||||
QITEM = l.findVarHandle(QNode.class, "item", Object.class);
|
QITEM = l.findVarHandle(QNode.class, "item", Object.class);
|
||||||
QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
|
QNEXT = l.findVarHandle(QNode.class, "next", QNode.class);
|
||||||
|
QWAITER = l.findVarHandle(QNode.class, "waiter", Thread.class);
|
||||||
} catch (ReflectiveOperationException e) {
|
} catch (ReflectiveOperationException e) {
|
||||||
throw new ExceptionInInitializerError(e);
|
throw new ExceptionInInitializerError(e);
|
||||||
}
|
}
|
||||||
|
@ -661,104 +652,79 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
* than having them implicitly interspersed.
|
* than having them implicitly interspersed.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
QNode s = null; // constructed/reused as needed
|
QNode s = null; // constructed/reused as needed
|
||||||
boolean isData = (e != null);
|
boolean isData = (e != null);
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
QNode t = tail;
|
QNode t = tail, h = head, m, tn; // m is node to fulfill
|
||||||
QNode h = head;
|
if (t == null || h == null)
|
||||||
if (t == null || h == null) // saw uninitialized value
|
; // inconsistent
|
||||||
continue; // spin
|
else if (h == t || t.isData == isData) { // empty or same-mode
|
||||||
|
if (t != tail) // inconsistent
|
||||||
if (h == t || t.isData == isData) { // empty or same-mode
|
;
|
||||||
QNode tn = t.next;
|
else if ((tn = t.next) != null) // lagging tail
|
||||||
if (t != tail) // inconsistent read
|
|
||||||
continue;
|
|
||||||
if (tn != null) { // lagging tail
|
|
||||||
advanceTail(t, tn);
|
advanceTail(t, tn);
|
||||||
continue;
|
else if (timed && nanos <= 0L) // can't wait
|
||||||
}
|
|
||||||
if (timed && nanos <= 0L) // can't wait
|
|
||||||
return null;
|
|
||||||
if (s == null)
|
|
||||||
s = new QNode(e, isData);
|
|
||||||
if (!t.casNext(null, s)) // failed to link in
|
|
||||||
continue;
|
|
||||||
|
|
||||||
advanceTail(t, s); // swing tail and wait
|
|
||||||
Object x = awaitFulfill(s, e, timed, nanos);
|
|
||||||
if (x == s) { // wait was cancelled
|
|
||||||
clean(t, s);
|
|
||||||
return null;
|
return null;
|
||||||
|
else if (t.casNext(null, (s != null) ? s :
|
||||||
|
(s = new QNode(e, isData)))) {
|
||||||
|
advanceTail(t, s);
|
||||||
|
long deadline = timed ? System.nanoTime() + nanos : 0L;
|
||||||
|
Thread w = Thread.currentThread();
|
||||||
|
int stat = -1; // same idea as TransferStack
|
||||||
|
Object item;
|
||||||
|
while ((item = s.item) == e) {
|
||||||
|
if ((timed &&
|
||||||
|
(nanos = deadline - System.nanoTime()) <= 0) ||
|
||||||
|
w.isInterrupted()) {
|
||||||
|
if (s.tryCancel(e)) {
|
||||||
|
clean(t, s);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} else if ((item = s.item) != e) {
|
||||||
|
break; // recheck
|
||||||
|
} else if (stat <= 0) {
|
||||||
|
if (t.next == s) {
|
||||||
|
if (stat < 0 && t.isFulfilled()) {
|
||||||
|
stat = 0; // yield once if first
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
stat = 1;
|
||||||
|
s.waiter = w;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (!timed) {
|
||||||
|
LockSupport.setCurrentBlocker(this);
|
||||||
|
try {
|
||||||
|
ForkJoinPool.managedBlock(s);
|
||||||
|
} catch (InterruptedException cannotHappen) { }
|
||||||
|
LockSupport.setCurrentBlocker(null);
|
||||||
|
}
|
||||||
|
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
|
||||||
|
LockSupport.parkNanos(this, nanos);
|
||||||
|
}
|
||||||
|
if (stat == 1)
|
||||||
|
s.forgetWaiter();
|
||||||
|
if (!s.isOffList()) { // not already unlinked
|
||||||
|
advanceHead(t, s); // unlink if head
|
||||||
|
if (item != null) // and forget fields
|
||||||
|
s.item = s;
|
||||||
|
}
|
||||||
|
return (item != null) ? (E)item : e;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!s.isOffList()) { // not already unlinked
|
} else if ((m = h.next) != null && t == tail && h == head) {
|
||||||
advanceHead(t, s); // unlink if head
|
Thread waiter;
|
||||||
if (x != null) // and forget fields
|
|
||||||
s.item = s;
|
|
||||||
s.waiter = null;
|
|
||||||
}
|
|
||||||
return (x != null) ? (E)x : e;
|
|
||||||
|
|
||||||
} else { // complementary-mode
|
|
||||||
QNode m = h.next; // node to fulfill
|
|
||||||
if (t != tail || m == null || h != head)
|
|
||||||
continue; // inconsistent read
|
|
||||||
|
|
||||||
Object x = m.item;
|
Object x = m.item;
|
||||||
if (isData == (x != null) || // m already fulfilled
|
boolean fulfilled = ((isData == (x == null)) &&
|
||||||
x == m || // m cancelled
|
x != m && m.casItem(x, e));
|
||||||
!m.casItem(x, e)) { // lost CAS
|
advanceHead(h, m); // (help) dequeue
|
||||||
advanceHead(h, m); // dequeue and retry
|
if (fulfilled) {
|
||||||
continue;
|
if ((waiter = m.waiter) != null)
|
||||||
}
|
LockSupport.unpark(waiter);
|
||||||
|
return (x != null) ? (E)x : e;
|
||||||
advanceHead(h, m); // successfully fulfilled
|
|
||||||
LockSupport.unpark(m.waiter);
|
|
||||||
return (x != null) ? (E)x : e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Spins/blocks until node s is fulfilled.
|
|
||||||
*
|
|
||||||
* @param s the waiting node
|
|
||||||
* @param e the comparison value for checking match
|
|
||||||
* @param timed true if timed wait
|
|
||||||
* @param nanos timeout value
|
|
||||||
* @return matched item, or s if cancelled
|
|
||||||
*/
|
|
||||||
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
|
|
||||||
/* Same idea as TransferStack.awaitFulfill */
|
|
||||||
final long deadline = timed ? System.nanoTime() + nanos : 0L;
|
|
||||||
Thread w = Thread.currentThread();
|
|
||||||
int spins = (head.next == s)
|
|
||||||
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
|
|
||||||
: 0;
|
|
||||||
for (;;) {
|
|
||||||
if (w.isInterrupted())
|
|
||||||
s.tryCancel(e);
|
|
||||||
Object x = s.item;
|
|
||||||
if (x != e)
|
|
||||||
return x;
|
|
||||||
if (timed) {
|
|
||||||
nanos = deadline - System.nanoTime();
|
|
||||||
if (nanos <= 0L) {
|
|
||||||
s.tryCancel(e);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (spins > 0) {
|
|
||||||
--spins;
|
|
||||||
Thread.onSpinWait();
|
|
||||||
}
|
|
||||||
else if (s.waiter == null)
|
|
||||||
s.waiter = w;
|
|
||||||
else if (!timed)
|
|
||||||
LockSupport.park(this);
|
|
||||||
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
|
|
||||||
LockSupport.parkNanos(this, nanos);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -766,7 +732,7 @@ public class SynchronousQueue<E> extends AbstractQueue<E>
|
||||||
* Gets rid of cancelled node s with original predecessor pred.
|
* Gets rid of cancelled node s with original predecessor pred.
|
||||||
*/
|
*/
|
||||||
void clean(QNode pred, QNode s) {
|
void clean(QNode pred, QNode s) {
|
||||||
s.waiter = null; // forget thread
|
s.forgetWaiter();
|
||||||
/*
|
/*
|
||||||
* At any given time, exactly one node on list cannot be
|
* At any given time, exactly one node on list cannot be
|
||||||
* deleted -- the last inserted node. To accommodate this,
|
* deleted -- the last inserted node. To accommodate this,
|
||||||
|
|
|
@ -61,7 +61,6 @@ import java.util.function.Consumer;
|
||||||
public class WhiteBox {
|
public class WhiteBox {
|
||||||
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
|
||||||
final VarHandle HEAD, TAIL, ITEM, NEXT;
|
final VarHandle HEAD, TAIL, ITEM, NEXT;
|
||||||
final int SWEEP_THRESHOLD;
|
|
||||||
|
|
||||||
public WhiteBox() throws ReflectiveOperationException {
|
public WhiteBox() throws ReflectiveOperationException {
|
||||||
Class<?> qClass = LinkedTransferQueue.class;
|
Class<?> qClass = LinkedTransferQueue.class;
|
||||||
|
@ -72,9 +71,6 @@ public class WhiteBox {
|
||||||
TAIL = lookup.findVarHandle(qClass, "tail", nodeClass);
|
TAIL = lookup.findVarHandle(qClass, "tail", nodeClass);
|
||||||
NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass);
|
NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass);
|
||||||
ITEM = lookup.findVarHandle(nodeClass, "item", Object.class);
|
ITEM = lookup.findVarHandle(nodeClass, "item", Object.class);
|
||||||
SWEEP_THRESHOLD = (int)
|
|
||||||
lookup.findStaticVarHandle(qClass, "SWEEP_THRESHOLD", int.class)
|
|
||||||
.get();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); }
|
Object head(LinkedTransferQueue q) { return HEAD.getVolatile(q); }
|
||||||
|
@ -367,36 +363,6 @@ public class WhiteBox {
|
||||||
assertInvariants(q);
|
assertInvariants(q);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void cancelledNodeSweeping() throws Throwable {
|
|
||||||
assertEquals(SWEEP_THRESHOLD & (SWEEP_THRESHOLD - 1), 0);
|
|
||||||
LinkedTransferQueue q = new LinkedTransferQueue();
|
|
||||||
Thread blockHead = null;
|
|
||||||
if (rnd.nextBoolean()) {
|
|
||||||
blockHead = new Thread(
|
|
||||||
() -> { try { q.take(); } catch (InterruptedException ok) {}});
|
|
||||||
blockHead.start();
|
|
||||||
while (nodeCount(q) != 2) { Thread.yield(); }
|
|
||||||
assertTrue(q.hasWaitingConsumer());
|
|
||||||
assertEquals(q.getWaitingConsumerCount(), 1);
|
|
||||||
}
|
|
||||||
int initialNodeCount = nodeCount(q);
|
|
||||||
|
|
||||||
// Some dead nodes do in fact accumulate ...
|
|
||||||
if (blockHead != null)
|
|
||||||
while (nodeCount(q) < initialNodeCount + SWEEP_THRESHOLD / 2)
|
|
||||||
q.poll(1L, TimeUnit.MICROSECONDS);
|
|
||||||
|
|
||||||
// ... but no more than SWEEP_THRESHOLD nodes accumulate
|
|
||||||
for (int i = rnd.nextInt(SWEEP_THRESHOLD * 10); i-->0; )
|
|
||||||
q.poll(1L, TimeUnit.MICROSECONDS);
|
|
||||||
assertTrue(nodeCount(q) <= initialNodeCount + SWEEP_THRESHOLD);
|
|
||||||
|
|
||||||
if (blockHead != null) {
|
|
||||||
blockHead.interrupt();
|
|
||||||
blockHead.join();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Checks conditions which should always be true. */
|
/** Checks conditions which should always be true. */
|
||||||
void assertInvariants(LinkedTransferQueue q) {
|
void assertInvariants(LinkedTransferQueue q) {
|
||||||
assertNotNull(head(q));
|
assertNotNull(head(q));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue