8134853: Bulk integration of java.util.concurrent classes

8080939: ForkJoinPool and Phaser deadlock
8044616: Clients of Unsafe.compareAndSwapLong need to beware of using direct stores to the same field
8071638: [JAVADOC] Buggy example in javadoc for afterExecute to access a submitted job's Throwable
8043743: Data missed in java.util.concurrent.LinkedTransferQueue
8054446: Repeated offer and remove on ConcurrentLinkedQueue lead to an OutOfMemoryError
8031374: TEST_BUG: java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java fails Intermittently
8034208: Cleanup to test/java/util/concurrent/BlockingQueue/Interrupt.java
8035661: Test fix java/util/concurrent/ConcurrentQueues/OfferRemoveLoops.java from jsr166 CVS
8062841: ConcurrentHashMap.computeIfAbsent stuck in an endless loop
8073208: javadoc typo in java.util.concurrent.Executor
8073704: FutureTask.isDone returns true when task has not yet completed
8037093: java/util/concurrent/locks/Lock/TimedAcquireLeak.java fails intermittently
8022642: ScheduledThreadPoolExecutor with zero corePoolSize create endlessly threads
8065320: Busy loop in ThreadPoolExecutor.getTask for ScheduledThreadPoolExecutor
8129861: High processor load for ScheduledThreadPoolExecutor with 0 core threads
8051859: ScheduledExecutorService.scheduleWithFixedDelay fails with max delay
7146994: example afterExecute for ScheduledThreadPoolExecutor hangs

Reviewed-by: martin, psandoz, chegar
This commit is contained in:
Doug Lea 2015-10-13 16:45:35 -07:00
parent 0f49a089d6
commit c38b0eaba5
124 changed files with 4333 additions and 2441 deletions

View file

@ -36,10 +36,11 @@
package java.util.concurrent;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.Spliterator;
import java.util.Spliterators;
@ -60,9 +61,9 @@ import java.util.function.Consumer;
* does not permit the use of {@code null} elements.
*
* <p>This implementation employs an efficient <em>non-blocking</em>
* algorithm based on one described in <a
* href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
* Fast, and Practical Non-Blocking and Blocking Concurrent Queue
* algorithm based on one described in
* <a href="http://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf">
* Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue
* Algorithms</a> by Maged M. Michael and Michael L. Scott.
*
* <p>Iterators are <i>weakly consistent</i>, returning elements
@ -100,7 +101,7 @@ import java.util.function.Consumer;
*
* @since 1.5
* @author Doug Lea
* @param <E> the type of elements held in this collection
* @param <E> the type of elements held in this queue
*/
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
@ -180,45 +181,28 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
private static class Node<E> {
volatile E item;
volatile Node<E> next;
}
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
/**
* Returns a new node holding item. Uses relaxed write because item
* can only be seen after piggy-backing publication via casNext.
*/
static <E> Node<E> newNode(E item) {
Node<E> node = new Node<E>();
U.putObject(node, ITEM, item);
return node;
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
static <E> boolean casItem(Node<E> node, E cmp, E val) {
return U.compareAndSwapObject(node, ITEM, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
static <E> void lazySetNext(Node<E> node, Node<E> val) {
U.putOrderedObject(node, NEXT, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
static <E> boolean casNext(Node<E> node, Node<E> cmp, Node<E> val) {
return U.compareAndSwapObject(node, NEXT, cmp, val);
}
/**
@ -233,7 +217,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* - it is permitted for tail to lag behind head, that is, for tail
* to not be reachable from head!
*/
private transient volatile Node<E> head;
transient volatile Node<E> head;
/**
* A node from which the last node on list (that is, the unique
@ -253,7 +237,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* Creates a {@code ConcurrentLinkedQueue} that is initially empty.
*/
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
head = tail = newNode(null);
}
/**
@ -268,17 +252,16 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
public ConcurrentLinkedQueue(Collection<? extends E> c) {
Node<E> h = null, t = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
Node<E> newNode = newNode(Objects.requireNonNull(e));
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
lazySetNext(t, newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
h = t = newNode(null);
head = h;
tail = t;
}
@ -302,8 +285,9 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* as sentinel for succ(), below.
*/
final void updateHead(Node<E> h, Node<E> p) {
// assert h != null && p != null && (h == p || h.item == null);
if (h != p && casHead(h, p))
h.lazySetNext(h);
lazySetNext(h, h);
}
/**
@ -324,14 +308,13 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
final Node<E> newNode = newNode(Objects.requireNonNull(e));
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
if (casNext(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
@ -359,7 +342,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (item != null && casItem(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
@ -446,13 +429,17 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
restartFromHead: for (;;) {
int count = 0;
for (Node<E> p = first(); p != null;) {
if (p.item != null)
if (++count == Integer.MAX_VALUE)
break; // @see Collection.size()
if (p == (p = p.next))
continue restartFromHead;
}
return count;
}
}
/**
@ -464,11 +451,12 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
if (o != null) {
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
}
return false;
}
@ -485,19 +473,25 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) {
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
return true;
if (o != null) {
Node<E> next, pred = null;
for (Node<E> p = first(); p != null; pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
if (!o.equals(item)) {
next = succ(p);
continue;
}
removed = casItem(p, item, null);
}
next = succ(p);
if (pred != null && next != null) // unlink
casNext(pred, p, next);
if (removed)
return true;
}
pred = p;
}
return false;
}
@ -522,12 +516,11 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
// Copy c into a private chain of Nodes
Node<E> beginningOfTheEnd = null, last = null;
for (E e : c) {
checkNotNull(e);
Node<E> newNode = new Node<E>(e);
Node<E> newNode = newNode(Objects.requireNonNull(e));
if (beginningOfTheEnd == null)
beginningOfTheEnd = last = newNode;
else {
last.lazySetNext(newNode);
lazySetNext(last, newNode);
last = newNode;
}
}
@ -539,7 +532,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, beginningOfTheEnd)) {
if (casNext(p, null, beginningOfTheEnd)) {
// Successful CAS is the linearization point
// for all elements to be added to this queue.
if (!casTail(t, last)) {
@ -565,6 +558,62 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
}
}
public String toString() {
String[] a = null;
restartFromHead: for (;;) {
int charLength = 0;
int size = 0;
for (Node<E> p = first(); p != null;) {
E item = p.item;
if (item != null) {
if (a == null)
a = new String[4];
else if (size == a.length)
a = Arrays.copyOf(a, 2 * size);
String s = item.toString();
a[size++] = s;
charLength += s.length();
}
if (p == (p = p.next))
continue restartFromHead;
}
if (size == 0)
return "[]";
return Helpers.toString(a, size, charLength);
}
}
private Object[] toArrayInternal(Object[] a) {
Object[] x = a;
restartFromHead: for (;;) {
int size = 0;
for (Node<E> p = first(); p != null;) {
E item = p.item;
if (item != null) {
if (x == null)
x = new Object[4];
else if (size == x.length)
x = Arrays.copyOf(x, 2 * (size + 4));
x[size++] = item;
}
if (p == (p = p.next))
continue restartFromHead;
}
if (x == null)
return new Object[0];
else if (a != null && size <= a.length) {
if (a != x)
System.arraycopy(x, 0, a, 0, size);
if (size < a.length)
a[size] = null;
return a;
}
return (size == x.length) ? x : Arrays.copyOf(x, size);
}
}
/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
@ -579,14 +628,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
// Use ArrayList to deal with resizing.
ArrayList<E> al = new ArrayList<E>();
for (Node<E> p = first(); p != null; p = succ(p)) {
E item = p.item;
if (item != null)
al.add(item);
}
return al.toArray();
return toArrayInternal(null);
}
/**
@ -610,7 +652,7 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
* The following code can be used to dump the queue into a newly
* allocated array of {@code String}:
*
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
@ -626,28 +668,8 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
// try to use sent-in array
int k = 0;
Node<E> p;
for (p = first(); p != null && k < a.length; p = succ(p)) {
E item = p.item;
if (item != null)
a[k++] = (T)item;
}
if (p == null) {
if (k < a.length)
a[k] = null;
return a;
}
// If won't fit, use ArrayList version
ArrayList<E> al = new ArrayList<E>();
for (Node<E> q = first(); q != null; q = succ(q)) {
E item = q.item;
if (item != null)
al.add(item);
}
return al.toArray(a);
if (a == null) throw new NullPointerException();
return (T[]) toArrayInternal(a);
}
/**
@ -683,54 +705,47 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
private Node<E> lastRet;
Itr() {
advance();
}
/**
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
private E advance() {
lastRet = nextNode;
E x = nextItem;
Node<E> pred, p;
if (nextNode == null) {
p = first();
pred = null;
} else {
pred = nextNode;
p = succ(nextNode);
}
for (;;) {
if (p == null) {
nextNode = null;
nextItem = null;
return x;
}
E item = p.item;
if (item != null) {
nextNode = p;
nextItem = item;
return x;
} else {
// skip over nulls
Node<E> next = succ(p);
if (pred != null && next != null)
pred.casNext(p, next);
p = next;
restartFromHead: for (;;) {
Node<E> h, p, q;
for (p = h = head;; p = q) {
E item;
if ((item = p.item) != null) {
nextNode = p;
nextItem = item;
break;
}
else if ((q = p.next) == null)
break;
else if (p == q)
continue restartFromHead;
}
updateHead(h, p);
return;
}
}
public boolean hasNext() {
return nextNode != null;
return nextItem != null;
}
public E next() {
if (nextNode == null) throw new NoSuchElementException();
return advance();
final Node<E> pred = nextNode;
if (pred == null) throw new NoSuchElementException();
// assert nextItem != null;
lastRet = pred;
E item = null;
for (Node<E> p = succ(pred), q;; p = q) {
if (p == null || (item = p.item) != null) {
nextNode = p;
E x = nextItem;
nextItem = item;
return x;
}
// unlink deleted nodes
if ((q = succ(p)) != null)
casNext(pred, p, q);
}
}
public void remove() {
@ -780,19 +795,18 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
// Read in elements until trailing null sentinel found
Node<E> h = null, t = null;
Object item;
while ((item = s.readObject()) != null) {
for (Object item; (item = s.readObject()) != null; ) {
@SuppressWarnings("unchecked")
Node<E> newNode = new Node<E>((E) item);
Node<E> newNode = newNode((E) item);
if (h == null)
h = t = newNode;
else {
t.lazySetNext(newNode);
lazySetNext(t, newNode);
t = newNode;
}
}
if (h == null)
h = t = new Node<E>(null);
h = t = newNode(null);
head = h;
tail = t;
}
@ -829,8 +843,9 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
(a, 0, i, (Spliterator.ORDERED |
Spliterator.NONNULL |
Spliterator.CONCURRENT));
}
}
return null;
@ -904,38 +919,32 @@ public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
return new CLQSpliterator<E>(this);
}
/**
* Throws NullPointerException if argument is null.
*
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
return U.compareAndSwapObject(this, TAIL, cmp, val);
}
private boolean casHead(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
return U.compareAndSwapObject(this, HEAD, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long HEAD;
private static final long TAIL;
private static final long ITEM;
private static final long NEXT;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentLinkedQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
} catch (Exception e) {
HEAD = U.objectFieldOffset
(ConcurrentLinkedQueue.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(ConcurrentLinkedQueue.class.getDeclaredField("tail"));
ITEM = U.objectFieldOffset
(Node.class.getDeclaredField("item"));
NEXT = U.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}