8169748: LinkedTransferQueue bulk remove is O(n^2)

8172023: Concurrent spliterators fail to handle exhaustion properly

Reviewed-by: martin, psandoz, smarks
This commit is contained in:
Doug Lea 2017-02-03 13:24:59 -08:00
parent 1f99fea68c
commit 7f519be836
10 changed files with 1961 additions and 751 deletions

View file

@ -81,12 +81,12 @@ import java.util.function.Predicate;
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements, and so may report
* inaccurate results if this collection is modified during traversal.
* Additionally, the bulk operations {@code addAll},
* {@code removeAll}, {@code retainAll}, {@code containsAll},
* and {@code toArray} are <em>not</em> guaranteed
* to be performed atomically. For example, an iterator operating
* concurrently with an {@code addAll} operation might view only some
* of the added elements.
*
* <p>Bulk operations that add, remove, or examine multiple elements,
* such as {@link #addAll}, {@link #removeIf} or {@link #forEach},
* are <em>not</em> guaranteed to be performed atomically.
* For example, a {@code forEach} traversal concurrent with an {@code
* addAll} operation might observe only some of the added elements.
*
* <p>This class and its iterator implement all of the <em>optional</em>
* methods of the {@link Queue} and {@link Iterator} interfaces.
@ -184,16 +184,30 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
static final class Node<E> {
volatile E item;
volatile Node<E> next;
}
/**
* Returns a new node holding item. Uses relaxed write because item
* can only be seen after piggy-backing publication via CAS.
*/
static <E> Node<E> newNode(E item) {
Node<E> node = new Node<E>();
ITEM.set(node, item);
return node;
/**
* Constructs a node holding item. Uses relaxed write because
* item can only be seen after piggy-backing publication via CAS.
*/
Node(E item) {
ITEM.set(this, item);
}
/** Constructs a dead dummy node. */
Node() {}
void appendRelaxed(Node<E> next) {
// assert next != null;
// assert this.next == null;
NEXT.set(this, next);
}
boolean casItem(E cmp, E val) {
// assert item == cmp || item == null;
// assert cmp != null;
// assert val == null;
return ITEM.compareAndSet(this, cmp, val);
}
}
/**
@ -220,7 +234,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* - tail.item may or may not be null.
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
* - tail.next may or may not be self-pointing to tail.
* - tail.next may or may not be self-linked.
*/
private transient volatile Node<E> tail;
@ -228,7 +242,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue() {
head = tail = newNode(null);
head = tail = new Node<E>();
}
/**
@ -243,16 +257,14 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
Node<E> newNode = newNode(Objects.requireNonNull(e));
Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else {
NEXT.set(t, newNode);
t = newNode;
}
else
t.appendRelaxed(t = newNode);
}
if (h == null)
h = t = newNode(null);
h = t = new Node<E>();
head = h;
tail = t;
}
@ -287,14 +299,17 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* stale pointer that is now off the list.
*/
final Node<E> succ(Node<E> p) {
Node<E> next = p.next;
return (p == next) ? head : next;
if (p == (p = p.next))
p = head;
return p;
}
/**
* Tries to CAS pred.next (or head, if pred is null) from c to p.
* Caller must ensure that we're not unlinking the trailing node.
*/
private boolean tryCasSuccessor(Node<E> pred, Node<E> c, Node<E> p) {
// assert p != null;
// assert c.item == null;
// assert c != p;
if (pred != null)
@ -306,6 +321,29 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
return false;
}
/**
* Collapse dead nodes between pred and q.
* @param pred the last known live node, or null if none
* @param c the first dead node
* @param p the last dead node
* @param q p.next: the next live node, or null if at end
* @return either old pred or p if pred dead or CAS failed
*/
private Node<E> skipDeadNodes(Node<E> pred, Node<E> c, Node<E> p, Node<E> q) {
// assert pred != c;
// assert p != q;
// assert c.item == null;
// assert p.item == null;
if (q == null) {
// Never unlink trailing node.
if (c == p) return pred;
q = p;
}
return (tryCasSuccessor(pred, c, q)
&& (pred == null || ITEM.get(pred) != null))
? pred : p;
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
@ -314,7 +352,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final Node<E> newNode = newNode(Objects.requireNonNull(e));
final Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
@ -346,8 +384,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
restartFromHead: for (;;) {
for (Node<E> h = head, p = h, q;; p = q) {
final E item;
if ((item = p.item) != null
&& ITEM.compareAndSet(p, item, null)) {
if ((item = p.item) != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
@ -451,19 +488,20 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public boolean contains(Object o) {
if (o == null) return false;
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
for (Node<E> p = head, pred = null; p != null; ) {
Node<E> q = p.next;
final E item;
if ((item = p.item) != null && o.equals(item))
return true;
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
if ((item = p.item) != null) {
if (o.equals(item))
return true;
pred = p; p = q; continue;
}
for (Node<E> c = p;; q = p.next) {
if (q == null || q.item != null) {
pred = skipDeadNodes(pred, c, p, q); p = q; break;
}
if (p == (p = q)) continue restartFromHead;
}
else if (p == q)
continue restartFromHead;
}
return false;
}
@ -483,23 +521,22 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public boolean remove(Object o) {
if (o == null) return false;
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
for (Node<E> p = head, pred = null; p != null; ) {
Node<E> q = p.next;
final E item;
final boolean removed =
(item = p.item) != null
&& o.equals(item)
&& ITEM.compareAndSet(p, item, null);
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
if (removed)
return true;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
if ((item = p.item) != null) {
if (o.equals(item) && p.casItem(item, null)) {
skipDeadNodes(pred, p, p, q);
return true;
}
pred = p; p = q; continue;
}
for (Node<E> c = p;; q = p.next) {
if (q == null || q.item != null) {
pred = skipDeadNodes(pred, c, p, q); p = q; break;
}
if (p == (p = q)) continue restartFromHead;
}
else if (p == q)
continue restartFromHead;
}
return false;
}
@ -525,13 +562,11 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
// Copy c into a private chain of Nodes
Node<E> beginningOfTheEnd = null, last = null;
for (E e : c) {
Node<E> newNode = newNode(Objects.requireNonNull(e));
Node<E> newNode = new Node<E>(Objects.requireNonNull(e));
if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode;
else {
NEXT.set(last, newNode);
last = newNode;
}
else
last.appendRelaxed(last = newNode);
}
if (beginningOfTheEnd == null)
return false;
@ -677,7 +712,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
if (a == null) throw new NullPointerException();
Objects.requireNonNull(a);
return (T[]) toArrayInternal(a);
}
@ -757,6 +792,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
}
// Default implementation of forEachRemaining is "good enough".
public void remove() {
Node<E> l = lastRet;
if (l == null) throw new IllegalStateException();
@ -806,16 +843,14 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
Node<E> h = null, t = null;
for (Object item; (item = s.readObject()) != null; ) {
@SuppressWarnings("unchecked")
Node<E> newNode = newNode((E) item);
Node<E> newNode = new Node<E>((E) item);
if (h == null)
h = t = newNode;
else {
NEXT.set(t, newNode);
t = newNode;
}
else
t.appendRelaxed(t = newNode);
}
if (h == null)
h = t = newNode(null);
h = t = new Node<E>();
head = h;
tail = t;
}
@ -828,62 +863,49 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
boolean exhausted; // true when no more nodes
public Spliterator<E> trySplit() {
Node<E> p;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
((p = current) != null || (p = first()) != null) &&
p.next != null) {
Object[] a = new Object[n];
int i = 0;
do {
if ((a[i] = p.item) != null)
++i;
if (p == (p = p.next))
p = first();
} while (p != null && i < n);
if ((current = p) == null)
exhausted = true;
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT));
Node<E> p, q;
if ((p = current()) == null || (q = p.next) == null)
return null;
int i = 0, n = batch = Math.min(batch + 1, MAX_BATCH);
Object[] a = null;
do {
final E e;
if ((e = p.item) != null) {
if (a == null)
a = new Object[n];
a[i++] = e;
}
}
return null;
if (p == (p = q))
p = first();
} while (p != null && (q = p.next) != null && i < n);
setCurrent(p);
return (i == 0) ? null :
Spliterators.spliterator(a, 0, i, (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT));
}
public void forEachRemaining(Consumer<? super E> action) {
Node<E> p;
if (action == null) throw new NullPointerException();
if (!exhausted &&
((p = current) != null || (p = first()) != null)) {
Objects.requireNonNull(action);
final Node<E> p;
if ((p = current()) != null) {
current = null;
exhausted = true;
do {
E e = p.item;
if (p == (p = p.next))
p = first();
if (e != null)
action.accept(e);
} while (p != null);
forEachFrom(action, p);
}
}
public boolean tryAdvance(Consumer<? super E> action) {
Objects.requireNonNull(action);
Node<E> p;
if (action == null) throw new NullPointerException();
if (!exhausted &&
((p = current) != null || (p = first()) != null)) {
if ((p = current()) != null) {
E e;
do {
e = p.item;
if (p == (p = p.next))
p = first();
} while (e == null && p != null);
if ((current = p) == null)
exhausted = true;
setCurrent(p);
if (e != null) {
action.accept(e);
return true;
@ -892,11 +914,24 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
return false;
}
private void setCurrent(Node<E> p) {
if ((current = p) == null)
exhausted = true;
}
private Node<E> current() {
Node<E> p;
if ((p = current) == null && !exhausted)
setCurrent(p = first());
return p;
}
public long estimateSize() { return Long.MAX_VALUE; }
public int characteristics() {
return Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT;
return (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT);
}
}
@ -963,22 +998,22 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
// c will be CASed to collapse intervening dead nodes between
// pred (or head if null) and p.
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
q = p.next;
final E item; boolean pAlive;
if (pAlive = ((item = p.item) != null)) {
if (filter.test(item)) {
if (ITEM.compareAndSet(p, item, null))
if (p.casItem(item, null))
removed = true;
pAlive = false;
}
}
if ((q = p.next) == null || pAlive || --hops == 0) {
if (pAlive || q == null || --hops == 0) {
// p might already be self-linked here, but if so:
// - CASing head will surely fail
// - CASing pred's next will be useless but harmless.
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
// if c != p, CAS failed, so abandon old pred
if (pAlive || c != p) {
if ((c != p && !tryCasSuccessor(pred, c, c = p))
|| pAlive) {
// if CAS failed or alive, abandon old pred
hops = MAX_HOPS;
pred = p;
c = q;
@ -990,35 +1025,40 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
}
/**
* Runs action on each element found during a traversal starting at p.
* If p is null, the action is not run.
*/
void forEachFrom(Consumer<? super E> action, Node<E> p) {
for (Node<E> pred = null; p != null; ) {
Node<E> q = p.next;
final E item;
if ((item = p.item) != null) {
action.accept(item);
pred = p; p = q; continue;
}
for (Node<E> c = p;; q = p.next) {
if (q == null || q.item != null) {
pred = skipDeadNodes(pred, c, p, q); p = q; break;
}
if (p == (p = q)) { pred = null; p = head; break; }
}
}
}
/**
* @throws NullPointerException {@inheritDoc}
*/
public void forEach(Consumer<? super E> action) {
Objects.requireNonNull(action);
restartFromHead: for (;;) {
for (Node<E> p = head, c = p, pred = null, q; p != null; p = q) {
final E item;
if ((item = p.item) != null)
action.accept(item);
if (c != p && tryCasSuccessor(pred, c, p))
c = p;
q = p.next;
if (item != null || c != p) {
pred = p;
c = q;
}
else if (p == q)
continue restartFromHead;
}
return;
}
forEachFrom(action, head);
}
// VarHandle mechanics
private static final VarHandle HEAD;
private static final VarHandle TAIL;
private static final VarHandle ITEM;
private static final VarHandle NEXT;
static final VarHandle ITEM;
static final VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();