8170484: Miscellaneous changes imported from jsr166 CVS 2016-12

Reviewed-by: martin, smarks, psandoz
This commit is contained in:
Doug Lea 2016-12-21 14:26:52 -08:00
parent bdab1d842f
commit 1414335f71
22 changed files with 543 additions and 328 deletions

View file

@ -47,6 +47,7 @@ import java.util.Queue;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
@ -112,7 +113,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
/*
* This is a modification of the Michael & Scott algorithm,
* adapted for a garbage-collected environment, with support for
* interior node deletion (to support remove(Object)). For
* interior node deletion (to support e.g. remove(Object)). For
* explanation, read the paper.
*
* Note that like most non-blocking algorithms in this package,
@ -160,12 +161,13 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* it is possible for tail to lag behind head (why not)?
*
* CASing a Node's item reference to null atomically removes the
* element from the queue. Iterators skip over Nodes with null
* items. Prior implementations of this class had a race between
* poll() and remove(Object) where the same element would appear
* to be successfully removed by two concurrent operations. The
* method remove(Object) also lazily unlinks deleted Nodes, but
* this is merely an optimization.
* element from the queue, leaving a "dead" node that should later
* be unlinked (but unlinking is merely an optimization).
* Interior element removal methods (other than Iterator.remove())
* keep track of the predecessor node during traversal so that the
* node can be CAS-unlinked. Some traversal methods try to unlink
* any deleted nodes encountered during traversal. See comments
* in bulkRemove.
*
* When constructing a Node (before enqueuing it) we avoid paying
* for a volatile write to item. This allows the cost of enqueue
@ -289,6 +291,21 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
return (p == next) ? head : next;
}
/**
* Tries to CAS pred.next (or head, if pred is null) from c to p.
*/
private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) {
// assert c.item == null;
// assert c != p;
if (pred != null)
return NEXT.compareAndSet(pred, c, p);
if (HEAD.compareAndSet(this, c, p)) {
NEXT.setRelease(c, c);
return true;
}
return false;
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
@ -326,12 +343,11 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && ITEM.compareAndSet(p, item, null)) {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
if ((item = p.item) != null
&& ITEM.compareAndSet(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
@ -344,25 +360,21 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
public E peek() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
if ((item = p.item) != null
|| (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
@ -376,9 +388,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* of losing a race to a concurrent poll().
*/
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
@ -386,8 +397,6 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
@ -440,14 +449,24 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o != null) {
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
if (o == null) return false;
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
final E item;
if ((item = p.item) != null && o.equals(item))
return true;
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
}
else if (p == q)
continue restartFromHead;
}
return false;
}
return false;
}
/**
@ -462,27 +481,28 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = ITEM.compareAndSet(p, item, null);
}
next = succ(p);
if (pred != null && next != null) // unlink
NEXT.weakCompareAndSet(pred, p, next);
if (o == null) return false;
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
final E item;
final boolean removed =
(item = p.item) != null
&& o.equals(item)
&& ITEM.compareAndSet(p, item, null);
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
if (removed)
return true;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
}
else if (p == q)
continue restartFromHead;
}
return false;
}
return false;
}
/**
@ -553,8 +573,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
int charLength = 0;
int size = 0;
for (Node<E> p = first(); p != null;) {
E item = p.item;
if (item != null) {
final E item;
if ((item = p.item) != null) {
if (a == null)
a = new String[4];
else if (size == a.length)
@ -579,8 +599,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
restartFromHead: for (;;) {
int size = 0;
for (Node<E> p = first(); p != null;) {
E item = p.item;
if (item != null) {
final E item;
if ((item = p.item) != null) {
if (x == null)
x = new Object[4];
else if (size == x.length)
@ -697,7 +717,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
restartFromHead: for (;;) {
Node<E> h, p, q;
for (p = h = head;; p = q) {
E item;
final E item;
if ((item = p.item) != null) {
nextNode = p;
nextItem = item;
@ -762,8 +782,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
// Write out all elements in the proper order.
for (Node<E> p = first(); p != null; p = succ(p)) {
Object item = p.item;
if (item != null)
final E item;
if ((item = p.item) != null)
s.writeObject(item);
}
@ -801,23 +821,18 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
/** A customized variant of Spliterators.IteratorSpliterator */
static final class CLQSpliterator<E> implements Spliterator<E> {
final class CLQSpliterator implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final ConcurrentLinkedQueue<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
CLQSpliterator(ConcurrentLinkedQueue<E> queue) {
this.queue = queue;
}
public Spliterator<E> trySplit() {
Node<E> p;
final ConcurrentLinkedQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
((p = current) != null || (p = q.first()) != null) &&
((p = current) != null || (p = first()) != null) &&
p.next != null) {
Object[] a = new Object[n];
int i = 0;
@ -825,7 +840,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
if ((a[i] = p.item) != null)
++i;
if (p == (p = p.next))
p = q.first();
p = first();
} while (p != null && i < n);
if ((current = p) == null)
exhausted = true;
@ -843,14 +858,13 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public void forEachRemaining(Consumer<? super E> action) {
Node<E> p;
if (action == null) throw new NullPointerException();
final ConcurrentLinkedQueue<E> q = this.queue;
if (!exhausted &&
((p = current) != null || (p = q.first()) != null)) {
((p = current) != null || (p = first()) != null)) {
exhausted = true;
do {
E e = p.item;
if (p == (p = p.next))
p = q.first();
p = first();
if (e != null)
action.accept(e);
} while (p != null);
@ -860,14 +874,13 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public boolean tryAdvance(Consumer<? super E> action) {
Node<E> p;
if (action == null) throw new NullPointerException();
final ConcurrentLinkedQueue<E> q = this.queue;
if (!exhausted &&
((p = current) != null || (p = q.first()) != null)) {
((p = current) != null || (p = first()) != null)) {
E e;
do {
e = p.item;
if (p == (p = p.next))
p = q.first();
p = first();
} while (e == null && p != null);
if ((current = p) == null)
exhausted = true;
@ -905,7 +918,100 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
*/
@Override
public Spliterator<E> spliterator() {
return new CLQSpliterator<E>(this);
return new CLQSpliterator();
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean removeIf(Predicate<? super E> filter) {
Objects.requireNonNull(filter);
return bulkRemove(filter);
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
return bulkRemove(e -> c.contains(e));
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public boolean retainAll(Collection<?> c) {
Objects.requireNonNull(c);
return bulkRemove(e -> !c.contains(e));
}
public void clear() {
bulkRemove(e -> true);
}
/**
* Tolerate this many consecutive dead nodes before CAS-collapsing.
* Amortized cost of clear() is (1 + 1/MAX_HOPS) CASes per element.
*/
private static final int MAX_HOPS = 8;
/** Implementation of bulk remove methods. */
private boolean bulkRemove(Predicate<? super E> filter) {
boolean removed = false;
restartFromHead: for (;;) {
int hops = MAX_HOPS;
// c will be CASed to collapse intervening dead nodes between
// pred (or head if null) and p.
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
final E item; boolean pAlive;
if (pAlive = ((item = p.item) != null)) {
if (filter.test(item)) {
if (ITEM.compareAndSet(p, item, null))
removed = true;
pAlive = false;
}
}
if ((q = p.next) == null || pAlive || --hops == 0) {
// p might already be self-linked here, but if so:
// - CASing head will surely fail
// - CASing pred's next will be useless but harmless.
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
// if c != p, CAS failed, so abandon old pred
if (pAlive || c != p) {
hops = MAX_HOPS;
pred = p;
c = q;
}
} else if (p == q)
continue restartFromHead;
}
return removed;
}
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
final E item;
if ((item = p.item) != null)
action.accept(item);
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
}
else if (p == q)
continue restartFromHead;
}
return;
}
}
// VarHandle mechanics