mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-27 06:45:07 +02:00
8259800: timeout in tck test testForkJoin(ForkJoinPool8Test)
Reviewed-by: martin, dholmes
This commit is contained in:
parent
419717ddae
commit
5b7b18c5bf
2 changed files with 241 additions and 172 deletions
|
@ -276,7 +276,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
return (int)STATUS.getAndBitwiseOr(this, v);
|
||||
}
|
||||
private boolean casStatus(int c, int v) {
|
||||
return STATUS.weakCompareAndSet(this, c, v);
|
||||
return STATUS.compareAndSet(this, c, v);
|
||||
}
|
||||
private boolean casAux(Aux c, Aux v) {
|
||||
return AUX.compareAndSet(this, c, v);
|
||||
|
@ -295,84 +295,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Possibly blocks until task is done or interrupted or timed out.
|
||||
*
|
||||
* @param interruptible true if wait can be cancelled by interrupt
|
||||
* @param deadline if non-zero use timed waits and possibly timeout
|
||||
* @param pool if nonnull pool to uncompensate after unblocking
|
||||
* @return status on exit, or ABNORMAL if interrupted while waiting
|
||||
*/
|
||||
private int awaitDone(boolean interruptible, long deadline,
|
||||
ForkJoinPool pool) {
|
||||
int s;
|
||||
boolean interrupted = false, queued = false, parked = false;
|
||||
Aux node = null;
|
||||
while ((s = status) >= 0) {
|
||||
Aux a; long ns;
|
||||
if (parked && Thread.interrupted()) {
|
||||
if (interruptible) {
|
||||
s = ABNORMAL;
|
||||
break;
|
||||
}
|
||||
interrupted = true;
|
||||
}
|
||||
else if (queued) {
|
||||
if (deadline != 0L) {
|
||||
if ((ns = deadline - System.nanoTime()) <= 0L)
|
||||
break;
|
||||
LockSupport.parkNanos(ns);
|
||||
}
|
||||
else
|
||||
LockSupport.park();
|
||||
parked = true;
|
||||
}
|
||||
else if (node != null) {
|
||||
if ((a = aux) != null && a.ex != null)
|
||||
Thread.onSpinWait(); // exception in progress
|
||||
else if (queued = casAux(node.next = a, node))
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
}
|
||||
else {
|
||||
try {
|
||||
node = new Aux(Thread.currentThread(), null);
|
||||
} catch (Throwable ex) { // try to cancel if cannot create
|
||||
casStatus(s, s | (DONE | ABNORMAL));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pool != null)
|
||||
pool.uncompensate();
|
||||
|
||||
if (queued) {
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
|
||||
outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
|
||||
for (Aux trail = null;;) {
|
||||
Aux next = a.next;
|
||||
if (a == node) {
|
||||
if (trail != null)
|
||||
trail.casNext(trail, next);
|
||||
else if (casAux(a, next))
|
||||
break outer; // cannot be re-encountered
|
||||
break; // restart
|
||||
} else {
|
||||
trail = a;
|
||||
if ((a = next) == null)
|
||||
break outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalWaiters(); // help clean or signal
|
||||
if (interrupted)
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets DONE status and wakes up threads waiting to join this task.
|
||||
* @return status on exit
|
||||
|
@ -463,27 +385,36 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
* Helps and/or waits for completion from join, get, or invoke;
|
||||
* called from either internal or external threads.
|
||||
*
|
||||
* @param pool if nonnull, known submitted pool, else assumes current pool
|
||||
* @param ran true if task known to have been exec'd
|
||||
* @param interruptible true if park interruptibly when external
|
||||
* @param timed true if use timed wait
|
||||
* @param nanos if timed, timeout value
|
||||
* @return ABNORMAL if interrupted, else status on exit
|
||||
*/
|
||||
private int awaitJoin(boolean ran, boolean interruptible, boolean timed,
|
||||
private int awaitDone(ForkJoinPool pool, boolean ran,
|
||||
boolean interruptible, boolean timed,
|
||||
long nanos) {
|
||||
boolean internal; ForkJoinPool p; ForkJoinPool.WorkQueue q; int s;
|
||||
Thread t; ForkJoinWorkerThread wt;
|
||||
if (internal = ((t = Thread.currentThread())
|
||||
instanceof ForkJoinWorkerThread)) {
|
||||
p = (wt = (ForkJoinWorkerThread)t).pool;
|
||||
q = wt.workQueue;
|
||||
ForkJoinPool p; boolean internal; int s; Thread t;
|
||||
ForkJoinPool.WorkQueue q = null;
|
||||
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
|
||||
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
|
||||
p = wt.pool;
|
||||
if (pool == null)
|
||||
pool = p;
|
||||
if (internal = (pool == p))
|
||||
q = wt.workQueue;
|
||||
}
|
||||
else {
|
||||
internal = false;
|
||||
p = ForkJoinPool.common;
|
||||
q = ForkJoinPool.commonQueue();
|
||||
if (interruptible && Thread.interrupted())
|
||||
return ABNORMAL;
|
||||
if (pool == null)
|
||||
pool = p;
|
||||
if (pool == p && p != null)
|
||||
q = p.externalQueue();
|
||||
}
|
||||
if (interruptible && Thread.interrupted())
|
||||
return ABNORMAL;
|
||||
if ((s = status) < 0)
|
||||
return s;
|
||||
long deadline = 0L;
|
||||
|
@ -493,22 +424,94 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
else if ((deadline = nanos + System.nanoTime()) == 0L)
|
||||
deadline = 1L;
|
||||
}
|
||||
ForkJoinPool uncompensate = null;
|
||||
if (q != null && p != null) { // try helping
|
||||
if ((!timed || p.isSaturated()) &&
|
||||
((this instanceof CountedCompleter) ?
|
||||
(s = p.helpComplete(this, q, internal)) < 0 :
|
||||
(q.tryRemove(this, internal) && (s = doExec()) < 0)))
|
||||
return s;
|
||||
boolean uncompensate = false;
|
||||
if (q != null && p != null) { // try helping
|
||||
// help even in timed mode if pool has no parallelism
|
||||
boolean canHelp = !timed || (p.mode & SMASK) == 0;
|
||||
if (canHelp) {
|
||||
if ((this instanceof CountedCompleter) &&
|
||||
(s = p.helpComplete(this, q, internal)) < 0)
|
||||
return s;
|
||||
if (!ran && ((!internal && q.externalTryUnpush(this)) ||
|
||||
q.tryRemove(this, internal)) && (s = doExec()) < 0)
|
||||
return s;
|
||||
}
|
||||
if (internal) {
|
||||
if ((s = p.helpJoin(this, q)) < 0)
|
||||
if ((s = p.helpJoin(this, q, canHelp)) < 0)
|
||||
return s;
|
||||
if (s == UNCOMPENSATE)
|
||||
uncompensate = p;
|
||||
interruptible = false;
|
||||
uncompensate = true;
|
||||
}
|
||||
}
|
||||
return awaitDone(interruptible, deadline, uncompensate);
|
||||
// block until done or cancelled wait
|
||||
boolean interrupted = false, queued = false;
|
||||
boolean parked = false, fail = false;
|
||||
Aux node = null;
|
||||
while ((s = status) >= 0) {
|
||||
Aux a; long ns;
|
||||
if (fail || (fail = (pool != null && pool.mode < 0)))
|
||||
casStatus(s, s | (DONE | ABNORMAL)); // try to cancel
|
||||
else if (parked && Thread.interrupted()) {
|
||||
if (interruptible) {
|
||||
s = ABNORMAL;
|
||||
break;
|
||||
}
|
||||
interrupted = true;
|
||||
}
|
||||
else if (queued) {
|
||||
if (deadline != 0L) {
|
||||
if ((ns = deadline - System.nanoTime()) <= 0L)
|
||||
break;
|
||||
LockSupport.parkNanos(ns);
|
||||
}
|
||||
else
|
||||
LockSupport.park();
|
||||
parked = true;
|
||||
}
|
||||
else if (node != null) {
|
||||
if ((a = aux) != null && a.ex != null)
|
||||
Thread.onSpinWait(); // exception in progress
|
||||
else if (queued = casAux(node.next = a, node))
|
||||
LockSupport.setCurrentBlocker(this);
|
||||
}
|
||||
else {
|
||||
try {
|
||||
node = new Aux(Thread.currentThread(), null);
|
||||
} catch (Throwable ex) { // cannot create
|
||||
fail = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pool != null && uncompensate)
|
||||
pool.uncompensate();
|
||||
|
||||
if (queued) {
|
||||
LockSupport.setCurrentBlocker(null);
|
||||
if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
|
||||
outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
|
||||
for (Aux trail = null;;) {
|
||||
Aux next = a.next;
|
||||
if (a == node) {
|
||||
if (trail != null)
|
||||
trail.casNext(trail, next);
|
||||
else if (casAux(a, next))
|
||||
break outer; // cannot be re-encountered
|
||||
break; // restart
|
||||
} else {
|
||||
trail = a;
|
||||
if ((a = next) == null)
|
||||
break outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
signalWaiters(); // help clean or signal
|
||||
if (interrupted)
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -664,7 +667,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
public final V join() {
|
||||
int s;
|
||||
if ((s = status) >= 0)
|
||||
s = awaitJoin(false, false, false, 0L);
|
||||
s = awaitDone(null, false, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
reportException(s);
|
||||
return getRawResult();
|
||||
|
@ -681,7 +684,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
public final V invoke() {
|
||||
int s;
|
||||
if ((s = doExec()) >= 0)
|
||||
s = awaitJoin(true, false, false, 0L);
|
||||
s = awaitDone(null, true, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
reportException(s);
|
||||
return getRawResult();
|
||||
|
@ -710,12 +713,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
throw new NullPointerException();
|
||||
t2.fork();
|
||||
if ((s1 = t1.doExec()) >= 0)
|
||||
s1 = t1.awaitJoin(true, false, false, 0L);
|
||||
s1 = t1.awaitDone(null, true, false, false, 0L);
|
||||
if ((s1 & ABNORMAL) != 0) {
|
||||
cancelIgnoringExceptions(t2);
|
||||
t1.reportException(s1);
|
||||
}
|
||||
else if (((s2 = t2.awaitJoin(false, false, false, 0L)) & ABNORMAL) != 0)
|
||||
else if (((s2 = t2.awaitDone(null, false, false, false, 0L)) & ABNORMAL) != 0)
|
||||
t2.reportException(s2);
|
||||
}
|
||||
|
||||
|
@ -746,7 +749,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
if (i == 0) {
|
||||
int s;
|
||||
if ((s = t.doExec()) >= 0)
|
||||
s = t.awaitJoin(true, false, false, 0L);
|
||||
s = t.awaitDone(null, true, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
ex = t.getException(s);
|
||||
break;
|
||||
|
@ -759,7 +762,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
if ((t = tasks[i]) != null) {
|
||||
int s;
|
||||
if ((s = t.status) >= 0)
|
||||
s = t.awaitJoin(false, false, false, 0L);
|
||||
s = t.awaitDone(null, false, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
|
||||
break;
|
||||
}
|
||||
|
@ -809,7 +812,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
if (i == 0) {
|
||||
int s;
|
||||
if ((s = t.doExec()) >= 0)
|
||||
s = t.awaitJoin(true, false, false, 0L);
|
||||
s = t.awaitDone(null, true, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
ex = t.getException(s);
|
||||
break;
|
||||
|
@ -822,7 +825,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
if ((t = ts.get(i)) != null) {
|
||||
int s;
|
||||
if ((s = t.status) >= 0)
|
||||
s = t.awaitJoin(false, false, false, 0L);
|
||||
s = t.awaitDone(null, false, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
|
||||
break;
|
||||
}
|
||||
|
@ -973,8 +976,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
* member of a ForkJoinPool and was interrupted while waiting
|
||||
*/
|
||||
public final V get() throws InterruptedException, ExecutionException {
|
||||
int s;
|
||||
if (((s = awaitJoin(false, true, false, 0L)) & ABNORMAL) != 0)
|
||||
int s = awaitDone(null, false, true, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
reportExecutionException(s);
|
||||
return getRawResult();
|
||||
}
|
||||
|
@ -995,9 +998,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
*/
|
||||
public final V get(long timeout, TimeUnit unit)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
int s;
|
||||
if ((s = awaitJoin(false, true, true, unit.toNanos(timeout))) >= 0 ||
|
||||
(s & ABNORMAL) != 0)
|
||||
long nanos = unit.toNanos(timeout);
|
||||
int s = awaitDone(null, false, true, true, nanos);
|
||||
if (s >= 0 || (s & ABNORMAL) != 0)
|
||||
reportExecutionException(s);
|
||||
return getRawResult();
|
||||
}
|
||||
|
@ -1010,9 +1013,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
*/
|
||||
public final void quietlyJoin() {
|
||||
if (status >= 0)
|
||||
awaitJoin(false, false, false, 0L);
|
||||
awaitDone(null, false, false, false, 0L);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Commences performing this task and awaits its completion if
|
||||
* necessary, without returning its result or throwing its
|
||||
|
@ -1020,7 +1024,37 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
|
|||
*/
|
||||
public final void quietlyInvoke() {
|
||||
if (doExec() >= 0)
|
||||
awaitJoin(true, false, false, 0L);
|
||||
awaitDone(null, true, false, false, 0L);
|
||||
}
|
||||
|
||||
// Versions of join/get for pool.invoke* methods that use external,
|
||||
// possibly-non-commonPool submits
|
||||
|
||||
final void awaitPoolInvoke(ForkJoinPool pool) {
|
||||
awaitDone(pool, false, false, false, 0L);
|
||||
}
|
||||
final void awaitPoolInvoke(ForkJoinPool pool, long nanos) {
|
||||
awaitDone(pool, false, true, true, nanos);
|
||||
}
|
||||
final V joinForPoolInvoke(ForkJoinPool pool) {
|
||||
int s = awaitDone(pool, false, false, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
reportException(s);
|
||||
return getRawResult();
|
||||
}
|
||||
final V getForPoolInvoke(ForkJoinPool pool)
|
||||
throws InterruptedException, ExecutionException {
|
||||
int s = awaitDone(pool, false, true, false, 0L);
|
||||
if ((s & ABNORMAL) != 0)
|
||||
reportExecutionException(s);
|
||||
return getRawResult();
|
||||
}
|
||||
final V getForPoolInvoke(ForkJoinPool pool, long nanos)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
int s = awaitDone(pool, false, true, true, nanos);
|
||||
if (s >= 0 || (s & ABNORMAL) != 0)
|
||||
reportExecutionException(s);
|
||||
return getRawResult();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue