8215326: Test java/util/concurrent/ConcurrentHashMap/ToArray.java hangs after j.u.c updates

Reviewed-by: martin, dholmes
This commit is contained in:
Doug Lea 2018-12-12 20:13:39 -08:00
parent cf21c5ef11
commit 3d9ab36ca0

View file

@ -445,7 +445,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* if to its current value). This would be extremely costly. So * if to its current value). This would be extremely costly. So
* we relax it in several ways: (1) Producers only signal when * we relax it in several ways: (1) Producers only signal when
* their queue is possibly empty at some point during a push * their queue is possibly empty at some point during a push
* operation. (2) Other workers propagate this signal * operation (which requires conservatively checking size zero or
* one to cover races). (2) Other workers propagate this signal
* when they find tasks in a queue with size greater than one. (3) * when they find tasks in a queue with size greater than one. (3)
* Workers only enqueue after scanning (see below) and not finding * Workers only enqueue after scanning (see below) and not finding
* any tasks. (4) Rather than CASing ctl to its current value in * any tasks. (4) Rather than CASing ctl to its current value in
@ -761,8 +762,10 @@ public class ForkJoinPool extends AbstractExecutorService {
/** /**
* The maximum number of top-level polls per worker before * The maximum number of top-level polls per worker before
* checking other queues, expressed as a bit shift. See above for * checking other queues, expressed as a bit shift to, in effect,
* rationale. * multiply by pool size, and then use as random value mask, so
* average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See
* above for rationale.
*/ */
static final int TOP_BOUND_SHIFT = 10; static final int TOP_BOUND_SHIFT = 10;
@ -838,17 +841,18 @@ public class ForkJoinPool extends AbstractExecutorService {
*/ */
final void push(ForkJoinTask<?> task) { final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinTask<?>[] a;
int s = top, d = s - base, cap, m; int s = top, d, cap, m;
ForkJoinPool p = pool; ForkJoinPool p = pool;
if ((a = array) != null && (cap = a.length) > 0) { if ((a = array) != null && (cap = a.length) > 0) {
QA.setRelease(a, (m = cap - 1) & s, task); QA.setRelease(a, (m = cap - 1) & s, task);
top = s + 1; top = s + 1;
if (d == m) if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
growArray(false); p != null) { // size 0 or 1
else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { VarHandle.fullFence();
VarHandle.fullFence(); // was empty p.signalWork();
p.signalWork(null);
} }
else if (d == m)
growArray(false);
} }
} }
@ -859,16 +863,16 @@ public class ForkJoinPool extends AbstractExecutorService {
final boolean lockedPush(ForkJoinTask<?> task) { final boolean lockedPush(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinTask<?>[] a;
boolean signal = false; boolean signal = false;
int s = top, d = s - base, cap, m; int s = top, b = base, cap, d;
if ((a = array) != null && (cap = a.length) > 0) { if ((a = array) != null && (cap = a.length) > 0) {
a[(m = (cap - 1)) & s] = task; a[(cap - 1) & s] = task;
top = s + 1; top = s + 1;
if (d == m) if (b - s + cap - 1 == 0)
growArray(true); growArray(true);
else { else {
phase = 0; // full volatile unlock phase = 0; // full volatile unlock
if (a[m & (s - 1)] == null) if (((s - base) & ~1) == 0) // size 0 or 1
signal = true; // was empty signal = true;
} }
} }
return signal; return signal;
@ -1010,30 +1014,25 @@ public class ForkJoinPool extends AbstractExecutorService {
* queue, up to bound n (to avoid infinite unfairness). * queue, up to bound n (to avoid infinite unfairness).
*/ */
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
int nstolen = 1; if (t != null && q != null) { // hoist checks
for (int j = 0;;) { int nstolen = 1;
if (t != null) for (;;) {
t.doExec(); t.doExec();
if (j++ <= n) if (n-- < 0)
t = nextLocalTask();
else {
j = 0;
t = null;
}
if (t == null) {
if (q != null && (t = q.poll()) != null) {
++nstolen;
j = 0;
}
else if (j != 0)
break; break;
else if ((t = nextLocalTask()) == null) {
if ((t = q.poll()) == null)
break;
else
++nstolen;
}
} }
ForkJoinWorkerThread thread = owner;
nsteals += nstolen;
source = 0;
if (thread != null)
thread.afterTopLevelExec();
} }
ForkJoinWorkerThread thread = owner;
nsteals += nstolen;
source = 0;
if (thread != null)
thread.afterTopLevelExec();
} }
/** /**
@ -1456,7 +1455,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (!tryTerminate(false, false) && // possibly replace worker if (!tryTerminate(false, false) && // possibly replace worker
w != null && w.array != null) // avoid repeated failures w != null && w.array != null) // avoid repeated failures
signalWork(null); signalWork();
if (ex == null) // help clean on way out if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions(); ForkJoinTask.helpExpungeStaleExceptions();
@ -1466,9 +1465,8 @@ public class ForkJoinPool extends AbstractExecutorService {
/** /**
* Tries to create or release a worker if too few are running. * Tries to create or release a worker if too few are running.
* @param q if non-null recheck if empty on CAS failure
*/ */
final void signalWork(WorkQueue q) { final void signalWork() {
for (;;) { for (;;) {
long c; int sp; WorkQueue[] ws; int i; WorkQueue v; long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
if ((c = ctl) >= 0L) // enough workers if ((c = ctl) >= 0L) // enough workers
@ -1495,8 +1493,6 @@ public class ForkJoinPool extends AbstractExecutorService {
LockSupport.unpark(vt); LockSupport.unpark(vt);
break; break;
} }
else if (q != null && q.isEmpty()) // no need to retry
break;
} }
} }
} }
@ -1617,24 +1613,19 @@ public class ForkJoinPool extends AbstractExecutorService {
else if (rc <= 0 && (md & SHUTDOWN) != 0 && else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
tryTerminate(false, false)) tryTerminate(false, false))
break; // quiescent shutdown break; // quiescent shutdown
else if (w.phase < 0) { else if (rc <= 0 && pred != 0 && phase == (int)c) {
if (rc <= 0 && pred != 0 && phase == (int)c) { long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); long d = keepAlive + System.currentTimeMillis();
long d = keepAlive + System.currentTimeMillis(); LockSupport.parkUntil(this, d);
LockSupport.parkUntil(this, d); if (ctl == c && // drop on timeout if all idle
if (ctl == c && // drop on timeout if all idle d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
d - System.currentTimeMillis() <= TIMEOUT_SLOP && CTL.compareAndSet(this, c, nc)) {
CTL.compareAndSet(this, c, nc)) { w.phase = QUIET;
w.phase = QUIET; break;
break;
}
}
else {
LockSupport.park(this);
if (w.phase < 0) // one spurious wakeup check
LockSupport.park(this);
} }
} }
else if (w.phase < 0)
LockSupport.park(this); // OK if spuriously woken
w.source = 0; // disable signal w.source = 0; // disable signal
} }
} }
@ -1650,8 +1641,8 @@ public class ForkJoinPool extends AbstractExecutorService {
WorkQueue[] ws; int n; WorkQueue[] ws; int n;
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
for (int m = n - 1, j = r & m;;) { for (int m = n - 1, j = r & m;;) {
WorkQueue q; int b, s; WorkQueue q; int b;
if ((q = ws[j]) != null && (s = q.top) != (b = q.base)) { if ((q = ws[j]) != null && q.top != (b = q.base)) {
int qid = q.id; int qid = q.id;
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
if ((a = q.array) != null && (cap = a.length) > 0) { if ((a = q.array) != null && (cap = a.length) > 0) {
@ -1660,10 +1651,10 @@ public class ForkJoinPool extends AbstractExecutorService {
QA.compareAndSet(a, k, t, null)) { QA.compareAndSet(a, k, t, null)) {
q.base = b; q.base = b;
w.source = qid; w.source = qid;
if (s != b && a[(cap - 1) & b] != null) if (q.top - b > 0)
signalWork(q); // help signal if more tasks signalWork();
w.topLevelExec(t, q, // random fairness bound w.topLevelExec(t, q, // random fairness bound
(r | (1 << TOP_BOUND_SHIFT)) & SMASK); r & ((n << TOP_BOUND_SHIFT) - 1));
} }
} }
return true; return true;
@ -1909,7 +1900,7 @@ public class ForkJoinPool extends AbstractExecutorService {
r = ThreadLocalRandom.advanceProbe(r); r = ThreadLocalRandom.advanceProbe(r);
else { else {
if (q.lockedPush(task)) if (q.lockedPush(task))
signalWork(null); signalWork();
return; return;
} }
} }