8080603: Replace Unsafe with VarHandle in java.util.concurrent classes

8153715: Use Unsafe.weakCompareAndSet in java.util.concurrent

Reviewed-by: martin, psandoz, rriggs, plevart, dfuchs, shade
This commit is contained in:
Doug Lea 2016-07-15 14:04:09 -07:00
parent a09ddefd05
commit 14d4754bdd
48 changed files with 3971 additions and 1288 deletions

View file

@ -35,6 +35,8 @@
package java.util.concurrent;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.AbstractQueue;
import java.util.Arrays;
import java.util.Collection;
@ -166,9 +168,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* this is merely an optimization.
*
* When constructing a Node (before enqueuing it) we avoid paying
* for a volatile write to item by using Unsafe.putObject instead
* of a normal write. This allows the cost of enqueue to be
* "one-and-a-half" CASes.
* for a volatile write to item. This allows the cost of enqueue
* to be "one-and-a-half" CASes.
*
* Both head and tail may or may not point to a Node with a
* non-null item. If the queue is empty, all items must of course
@ -178,33 +179,21 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* optimization.
*/
private static class Node<E> {
static final class Node<E> {
volatile E item;
volatile Node<E> next;
}
/**
* Returns a new node holding item. Uses relaxed write because item
* can only be seen after piggy-backing publication via casNext.
* can only be seen after piggy-backing publication via CAS.
*/
static <E> Node<E> newNode(E item) {
Node<E> node = new Node<E>();
U.putObject(node, ITEM, item);
ITEM.set(node, item);
return node;
}
static <E> boolean casItem(Node<E> node, E cmp, E val) {
return U.compareAndSwapObject(node, ITEM, cmp, val);
}
static <E> void lazySetNext(Node<E> node, Node<E> val) {
U.putObjectRelease(node, NEXT, val);
}
static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) {
return U.compareAndSwapObject(node, NEXT, cmp, val);
}
/**
* A node from which the first live (non-deleted) node (if any)
* can be reached in O(1) time.
@ -256,7 +245,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
if (h == null)
h = t = newNode;
else {
lazySetNext(t, newNode);
NEXT.set(t, newNode);
t = newNode;
}
}
@ -286,8 +275,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
*/
final void updateHead(Node<E> h, Node<E> p) {
// assert h != null && p != null && (h == p || h.item == null);
if (h != p && casHead(h, p))
lazySetNext(h, h);
if (h != p && HEAD.compareAndSet(this, h, p))
NEXT.setRelease(h, h);
}
/**
@ -314,12 +303,12 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
Node<E> q = p.next;
if (q == null) {
// p is last node
if (casNext(p, null, newNode)) {
if (NEXT.compareAndSet(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
if (p != t) // hop two nodes at a time; failure is OK
TAIL.weakCompareAndSetVolatile(this, t, newNode);
return true;
}
// Lost CAS race to another thread; re-read next
@ -342,7 +331,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && casItem(p, item, null)) {
if (item != null && ITEM.compareAndSet(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
@ -483,12 +472,12 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
next = succ(p);
continue;
}
removed = casItem(p, item, null);
removed = ITEM.compareAndSet(p, item, null);
}
next = succ(p);
if (pred != null && next != null) // unlink
casNext(pred, p, next);
NEXT.weakCompareAndSetVolatile(pred, p, next);
if (removed)
return true;
}
@ -520,7 +509,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode;
else {
lazySetNext(last, newNode);
NEXT.set(last, newNode);
last = newNode;
}
}
@ -532,15 +521,15 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
Node<E> q = p.next;
if (q == null) {
// p is last node
if (casNext(p, null, beginningOfTheEnd)) {
if (NEXT.compareAndSet(p, null, beginningOfTheEnd)) {
// Successful CAS is the linearization point
// for all elements to be added to this queue.
if (!casTail(t, last)) {
if (!TAIL.weakCompareAndSetVolatile(this, t, last)) {
// Try a little harder to update tail,
// since we may be adding many elements.
t = tail;
if (last.next == null)
casTail(t, last);
TAIL.weakCompareAndSetVolatile(this, t, last);
}
return true;
}
@ -744,7 +733,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
// unlink deleted nodes
if ((q = succ(p)) != null)
casNext(pred, p, q);
NEXT.compareAndSet(pred, p, q);
}
}
@ -801,7 +790,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
if (h == null)
h = t = newNode;
else {
lazySetNext(t, newNode);
NEXT.set(t, newNode);
t = newNode;
}
}
@ -919,31 +908,20 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
return new CLQSpliterator<E>(this);
}
private boolean casTail(Node<E> cmp, Node<E> val) {
return U.compareAndSwapObject(this, TAIL, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return U.compareAndSwapObject(this, HEAD, cmp, val);
}
// Unsafe mechanics
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long HEAD;
private static final long TAIL;
private static final long ITEM;
private static final long NEXT;
// VarHandle mechanics
private static final VarHandle HEAD;
private static final VarHandle TAIL;
private static final VarHandle ITEM;
private static final VarHandle NEXT;
static {
try {
HEAD = U.objectFieldOffset
(ConcurrentLinkedQueue.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(ConcurrentLinkedQueue.class.getDeclaredField("tail"));
ITEM = U.objectFieldOffset
(Node.class.getDeclaredField("item"));
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
MethodHandles.Lookup l = MethodHandles.lookup();
HEAD = l.findVarHandle(ConcurrentLinkedQueue.class, "head",
Node.class);
TAIL = l.findVarHandle(ConcurrentLinkedQueue.class, "tail",
Node.class);
ITEM = l.findVarHandle(Node.class, "item", Object.class);
NEXT = l.findVarHandle(Node.class, "next", Node.class);
} catch (ReflectiveOperationException e) {
throw new Error(e);
}