8246585: ForkJoin updates

8229253: forkjoin/FJExceptionTableLeak.java fails "AssertionError: failed to satisfy condition"

Reviewed-by: dl
This commit is contained in:
Martin Buchholz 2021-01-09 20:57:52 +00:00
parent 6472104e18
commit 5cfa8c94d6
9 changed files with 2716 additions and 2219 deletions

View file

@ -357,7 +357,7 @@ import java.lang.invoke.VarHandle;
* within this method to ensure thread safety of accesses to fields of
* this task or other completed tasks.
*
* <p><b>Completion Traversals</b>. If using {@code onCompletion} to
* <p><b>Completion Traversals.</b> If using {@code onCompletion} to
* process completions is inapplicable or inconvenient, you can use
* methods {@link #firstComplete} and {@link #nextComplete} to create
* custom traversals. For example, to define a MapReducer that only
@ -553,6 +553,11 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
return PENDING.compareAndSet(this, expected, count);
}
// internal-only weak version
final boolean weakCompareAndSetPendingCount(int expected, int count) {
return PENDING.weakCompareAndSet(this, expected, count);
}
/**
* If the pending count is nonzero, (atomically) decrements it.
*
@ -562,7 +567,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
public final int decrementPendingCountUnlessZero() {
int c;
do {} while ((c = pending) != 0 &&
!PENDING.weakCompareAndSet(this, c, c - 1));
!weakCompareAndSetPendingCount(c, c - 1));
return c;
}
@ -595,7 +600,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
return;
}
}
else if (PENDING.weakCompareAndSet(a, c, c - 1))
else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
@ -618,7 +623,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
return;
}
}
else if (PENDING.weakCompareAndSet(a, c, c - 1))
else if (a.weakCompareAndSetPendingCount(c, c - 1))
return;
}
}
@ -663,7 +668,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
for (int c;;) {
if ((c = pending) == 0)
return this;
else if (PENDING.weakCompareAndSet(this, c, c - 1))
else if (weakCompareAndSetPendingCount(c, c - 1))
return null;
}
}
@ -718,30 +723,33 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* processed.
*/
public final void helpComplete(int maxTasks) {
Thread t; ForkJoinWorkerThread wt;
if (maxTasks > 0 && status >= 0) {
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
(wt = (ForkJoinWorkerThread)t).pool.
helpComplete(wt.workQueue, this, maxTasks);
ForkJoinPool.WorkQueue q; Thread t; boolean owned;
if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
ForkJoinPool.common.externalHelpComplete(this, maxTasks);
}
q = ForkJoinPool.commonQueue();
if (q != null && maxTasks > 0)
q.helpComplete(this, owned, maxTasks);
}
// ForkJoinTask overrides
/**
* Supports ForkJoinTask exception propagation.
*/
void internalPropagateException(Throwable ex) {
CountedCompleter<?> a = this, s = a;
while (a.onExceptionalCompletion(ex, s) &&
(a = (s = a).completer) != null && a.status >= 0 &&
isExceptionalStatus(a.recordExceptionalCompletion(ex)))
;
@Override
final int trySetException(Throwable ex) {
CountedCompleter<?> a = this, p = a;
do {} while (isExceptionalStatus(a.trySetThrown(ex)) &&
a.onExceptionalCompletion(ex, p) &&
(a = (p = a).completer) != null && a.status >= 0);
return status;
}
/**
* Implements execution conventions for CountedCompleters.
*/
@Override
protected final boolean exec() {
compute();
return false;
@ -756,6 +764,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
*
* @return the result of the computation
*/
@Override
public T getRawResult() { return null; }
/**
@ -765,6 +774,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* overridden to update existing objects or fields, then it must
* in general be defined to be thread-safe.
*/
@Override
protected void setRawResult(T t) { }
// VarHandle mechanics

View file

@ -35,10 +35,8 @@
package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
/**
* A thread managed by a {@link ForkJoinPool}, which executes
@ -62,26 +60,38 @@ public class ForkJoinWorkerThread extends Thread {
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via helper
* methods in ThreadLocalRandom) both here and in the subclass to
* access and set Thread fields.
* This class just maintains links to its pool and WorkQueue.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/** An AccessControlContext supporting no privileges */
private static final AccessControlContext INNOCUOUS_ACC =
new AccessControlContext(
new ProtectionDomain[] { new ProtectionDomain(null, null) });
/**
* Full nonpublic constructor.
*/
ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,
boolean useSystemClassLoader, boolean isInnocuous) {
super(group, null, pool.nextWorkerThreadName(), 0L);
UncaughtExceptionHandler handler = (this.pool = pool).ueh;
this.workQueue = new ForkJoinPool.WorkQueue(this, isInnocuous);
super.setDaemon(true);
if (handler != null)
super.setUncaughtExceptionHandler(handler);
if (useSystemClassLoader)
super.setContextClassLoader(ClassLoader.getSystemClassLoader());
}
/**
* Creates a ForkJoinWorkerThread operating in the given thread group and
* pool.
*
* @param group if non-null, the thread group for this thread
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
/* TODO: protected */ ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool) {
this(group, pool, false, false);
}
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
@ -90,38 +100,7 @@ public class ForkJoinWorkerThread extends Thread {
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Version for use by the default pool. Supports setting the
* context class loader. This is a separate constructor to avoid
* affecting the protected constructor.
*/
ForkJoinWorkerThread(ForkJoinPool pool, ClassLoader ccl) {
super("aForkJoinWorkerThread");
super.setContextClassLoader(ccl);
ThreadLocalRandom.setInheritedAccessControlContext(this, INNOCUOUS_ACC);
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Version for InnocuousForkJoinWorkerThread.
*/
ForkJoinWorkerThread(ForkJoinPool pool,
ClassLoader ccl,
ThreadGroup threadGroup,
AccessControlContext acc) {
super(threadGroup, null, "aForkJoinWorkerThread");
super.setContextClassLoader(ccl);
ThreadLocalRandom.setInheritedAccessControlContext(this, acc);
ThreadLocalRandom.eraseThreadLocals(this); // clear before registering
this.pool = pool;
this.workQueue = pool.registerWorker(this);
this(null, pool, false, false);
}
/**
@ -176,11 +155,14 @@ public class ForkJoinWorkerThread extends Thread {
* {@link ForkJoinTask}s.
*/
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
ForkJoinPool p = pool;
ForkJoinPool.WorkQueue w = workQueue;
if (p != null && w != null) { // skip on failed initialization
try {
p.registerWorker(w);
onStart();
pool.runWorker(workQueue);
p.runWorker(w);
} catch (Throwable ex) {
exception = ex;
} finally {
@ -190,18 +172,12 @@ public class ForkJoinWorkerThread extends Thread {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
p.deregisterWorker(this, exception);
}
}
}
}
/**
* Non-public hook method for InnocuousForkJoinWorkerThread.
*/
void afterTopLevelExec() {
}
/**
* A worker thread that has no permissions, is not a member of any
* user-defined ThreadGroup, uses the system class loader as
@ -221,15 +197,7 @@ public class ForkJoinWorkerThread extends Thread {
}});
InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
super(pool,
ClassLoader.getSystemClassLoader(),
innocuousThreadGroup,
INNOCUOUS_ACC);
}
@Override // to erase ThreadLocals
void afterTopLevelExec() {
ThreadLocalRandom.eraseThreadLocals(this);
super(innocuousThreadGroup, pool, true, true);
}
@Override // to silently fail

View file

@ -0,0 +1,177 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @run testng AsyncShutdownNow
* @summary Test invoking shutdownNow with threads blocked in Future.get,
* invokeAll, and invokeAny
*/
// TODO: this test is far too slow
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class AsyncShutdownNow {
// long running interruptible task
private static final Callable<Void> SLEEP_FOR_A_DAY = () -> {
Thread.sleep(86400_000);
return null;
};
private ScheduledExecutorService scheduledExecutor;
@BeforeClass
public void setup() {
scheduledExecutor = Executors.newScheduledThreadPool(1);
}
@AfterClass
public void teardown() {
scheduledExecutor.shutdown();
}
/**
* Schedule the given executor service to be shutdown abruptly after the given
* delay, in seconds.
*/
private void scheduleShutdownNow(ExecutorService executor, int delayInSeconds) {
scheduledExecutor.schedule(() -> {
executor.shutdownNow();
return null;
}, delayInSeconds, TimeUnit.SECONDS);
}
/**
* The executors to test.
*/
@DataProvider(name = "executors")
public Object[][] executors() {
return new Object[][] {
{ new ForkJoinPool() },
{ new ForkJoinPool(1) },
};
}
/**
* Test shutdownNow with running task and thread blocked in Future::get.
*/
@Test(dataProvider = "executors")
public void testFutureGet(ExecutorService executor) throws Exception {
System.out.format("testFutureGet: %s%n", executor);
scheduleShutdownNow(executor, 5);
try {
// submit long running task, the task should be cancelled
Future<?> future = executor.submit(SLEEP_FOR_A_DAY);
try {
future.get();
assertTrue(false);
} catch (ExecutionException e) {
// expected
}
} finally {
executor.shutdown();
}
}
/**
* Test shutdownNow with running task and thread blocked in a timed Future::get.
*/
@Test(dataProvider = "executors")
public void testTimedFutureGet(ExecutorService executor) throws Exception {
System.out.format("testTimedFutureGet: %s%n", executor);
scheduleShutdownNow(executor, 5);
try {
// submit long running task, the task should be cancelled
Future<?> future = executor.submit(SLEEP_FOR_A_DAY);
try {
future.get(1, TimeUnit.HOURS);
assertTrue(false);
} catch (ExecutionException e) {
// expected
}
} finally {
executor.shutdown();
}
}
/**
* Test shutdownNow with thread blocked in invokeAll.
*/
@Test(dataProvider = "executors")
public void testInvokeAll(ExecutorService executor) throws Exception {
System.out.format("testInvokeAll: %s%n", executor);
scheduleShutdownNow(executor, 5);
try {
// execute long running tasks
List<Future<Void>> futures = executor.invokeAll(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
for (Future<Void> f : futures) {
assertTrue(f.isDone());
try {
Object result = f.get();
assertTrue(false);
} catch (ExecutionException | CancellationException e) {
// expected
}
}
} finally {
executor.shutdown();
}
}
/**
* Test shutdownNow with thread blocked in invokeAny.
*/
@Test(dataProvider = "executors")
public void testInvokeAny(ExecutorService executor) throws Exception {
System.out.format("testInvokeAny: %s%n", executor);
scheduleShutdownNow(executor, 5);
try {
try {
// execute long running tasks
executor.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
assertTrue(false);
} catch (ExecutionException e) {
// expected
}
} finally {
executor.shutdown();
}
}
}

View file

@ -0,0 +1,100 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @run testng AsyncShutdownNowInvokeAny
* @summary A variant of AsyncShutdownNow useful for race bug hunting
*/
// TODO: reorganize all of the AsyncShutdown tests
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class AsyncShutdownNowInvokeAny {
// long running interruptible task
private static final Callable<Void> SLEEP_FOR_A_DAY = () -> {
Thread.sleep(86400_000);
return null;
};
private ScheduledExecutorService scheduledExecutor;
@BeforeClass
public void setup() {
scheduledExecutor = Executors.newScheduledThreadPool(1);
}
@AfterClass
public void teardown() {
scheduledExecutor.shutdown();
}
/**
* Schedule the given executor service to be shutdown abruptly after the given
* delay, in seconds.
*/
private void scheduleShutdownNow(ExecutorService executor, int delayInSeconds) {
scheduledExecutor.schedule(() -> {
executor.shutdownNow();
return null;
}, delayInSeconds, TimeUnit.SECONDS);
}
/**
* Test shutdownNow with thread blocked in invokeAny.
*/
@Test
public void testInvokeAny() throws Exception {
final int reps = 4;
for (int rep = 1; rep < reps; rep++) {
ExecutorService pool = new ForkJoinPool(1);
scheduleShutdownNow(pool, 5);
try {
try {
// execute long running tasks
pool.invokeAny(List.of(SLEEP_FOR_A_DAY, SLEEP_FOR_A_DAY));
assertTrue(false);
} catch (ExecutionException e) {
// expected
}
} finally {
pool.shutdown();
}
}
}
}

View file

@ -0,0 +1,199 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @run testng AsyncShutdownNowInvokeAnyRace
* @summary A variant of AsyncShutdownNow useful for race bug hunting
*/
// TODO: reorganize all of the AsyncShutdown tests
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.lang.management.ManagementFactory;
import java.lang.management.LockInfo;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class AsyncShutdownNowInvokeAnyRace {
// TODO: even more jitter-inducing parallelism?
// int nThreads = ThreadLocalRandom.current().nextInt(1, 50);
// ExecutorService pool = Executors.newCachedThreadPool();
// Callable<Void> task = () -> { testInvokeAny_1(); return null; };
// List<Callable<Void>> tasks = Collections.nCopies(nThreads, task);
// try {
// for (Future<Void> future : pool.invokeAll(tasks)) {
// future.get();
// }
// } finally {
// pool.shutdown();
// }
// }
// public void testInvokeAny_1() throws Exception {
/**
* Test shutdownNow with thread blocked in invokeAny.
*/
@Test
public void testInvokeAny() throws Exception {
final int reps = 30_000;
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int falseAlarms = 0;
for (int rep = 1; rep < reps; rep++) {
ForkJoinPool pool = new ForkJoinPool(1);
CountDownLatch pleaseShutdownNow = new CountDownLatch(1);
int nTasks = rnd.nextInt(2, 5);
AtomicInteger threadsStarted = new AtomicInteger(0);
AtomicReference<String> poolAtShutdownNow = new AtomicReference<>();
Callable<Void> blockPool = () -> {
threadsStarted.getAndIncrement();
// await submission quiescence; may false-alarm
// TODO: consider re-checking to reduce false alarms
while (threadsStarted.get() + pool.getQueuedSubmissionCount() < nTasks)
Thread.yield();
pleaseShutdownNow.countDown();
Thread.sleep(86400_000);
return null;
};
List<Callable<Void>> tasks = Collections.nCopies(nTasks, blockPool);
Runnable shutdown = () -> {
try {
pleaseShutdownNow.await();
poolAtShutdownNow.set(pool.toString());
pool.shutdownNow();
} catch (Throwable t) { throw new AssertionError(t); }
};
Future<Void> shutdownResult = CompletableFuture.runAsync(shutdown);
try {
try {
if (rnd.nextBoolean())
pool.invokeAny(tasks, 10L, SECONDS);
else
pool.invokeAny(tasks);
throw new AssertionError("should throw");
} catch (RejectedExecutionException re) {
falseAlarms++;
} catch (CancellationException re) {
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (!(cause instanceof InterruptedException) &&
!(cause instanceof CancellationException))
throw ex;
} catch (TimeoutException ex) {
dumpTestThreads();
int i = rep;
String detail = String.format(
"invokeAny timed out, "
+ "nTasks=%d rep=%d threadsStarted=%d%n"
+ "poolAtShutdownNow=%s%n"
+ "poolAtTimeout=%s%n"
+ "queuedTaskCount=%d queuedSubmissionCount=%d",
nTasks, i, threadsStarted.get(),
poolAtShutdownNow,
pool,
pool.getQueuedTaskCount(),
pool.getQueuedSubmissionCount());
throw new AssertionError(detail, ex);
}
} finally {
pool.shutdown();
}
if (falseAlarms != 0)
System.out.println("Premature shutdowns = " + falseAlarms);
shutdownResult.get();
}
}
//--- thread stack dumping (from JSR166TestCase.java) ---
private static final ThreadMXBean THREAD_MXBEAN
= ManagementFactory.getThreadMXBean();
/** Returns true if thread info might be useful in a thread dump. */
static boolean threadOfInterest(ThreadInfo info) {
final String name = info.getThreadName();
String lockName;
if (name == null)
return true;
if (name.equals("Signal Dispatcher")
|| name.equals("WedgedTestDetector"))
return false;
if (name.equals("Reference Handler")) {
// Reference Handler stacktrace changed in JDK-8156500
StackTraceElement[] stackTrace; String methodName;
if ((stackTrace = info.getStackTrace()) != null
&& stackTrace.length > 0
&& (methodName = stackTrace[0].getMethodName()) != null
&& methodName.equals("waitForReferencePendingList"))
return false;
// jdk8 Reference Handler stacktrace
if ((lockName = info.getLockName()) != null
&& lockName.startsWith("java.lang.ref"))
return false;
}
if ((name.equals("Finalizer") || name.equals("Common-Cleaner"))
&& (lockName = info.getLockName()) != null
&& lockName.startsWith("java.lang.ref"))
return false;
if (name.startsWith("ForkJoinPool.commonPool-worker")
&& (lockName = info.getLockName()) != null
&& lockName.startsWith("java.util.concurrent.ForkJoinPool"))
return false;
return true;
}
/**
* A debugging tool to print stack traces of most threads, as jstack does.
* Uninteresting threads are filtered out.
*/
static void dumpTestThreads() {
System.err.println("------ stacktrace dump start ------");
for (ThreadInfo info : THREAD_MXBEAN.dumpAllThreads(true, true))
if (threadOfInterest(info))
System.err.print(info);
System.err.println("------ stacktrace dump end ------");
}
}

View file

@ -1,204 +0,0 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea and Martin Buchholz with assistance from
* members of JCP JSR-166 Expert Group and released to the public
* domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
/*
* @test
* @bug 8004138 8205576
* @modules java.base/java.util.concurrent:open
* @run testng FJExceptionTableLeak
* @summary Checks that ForkJoinTask thrown exceptions are not leaked.
* This whitebox test is sensitive to forkjoin implementation details.
*/
import static org.testng.Assert.*;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
@Test
public class FJExceptionTableLeak {
final ThreadLocalRandom rnd = ThreadLocalRandom.current();
final VarHandle NEXT, EX;
final Object[] exceptionTable;
final ReentrantLock exceptionTableLock;
FJExceptionTableLeak() throws ReflectiveOperationException {
MethodHandles.Lookup lookup = MethodHandles.privateLookupIn(
ForkJoinTask.class, MethodHandles.lookup());
Class<?> nodeClass = Class.forName(
ForkJoinTask.class.getName() + "$ExceptionNode");
VarHandle exceptionTableHandle = lookup.findStaticVarHandle(
ForkJoinTask.class, "exceptionTable", arrayClass(nodeClass));
VarHandle exceptionTableLockHandle = lookup.findStaticVarHandle(
ForkJoinTask.class, "exceptionTableLock", ReentrantLock.class);
exceptionTable = (Object[]) exceptionTableHandle.get();
exceptionTableLock = (ReentrantLock) exceptionTableLockHandle.get();
NEXT = lookup.findVarHandle(nodeClass, "next", nodeClass);
EX = lookup.findVarHandle(nodeClass, "ex", Throwable.class);
}
static <T> Class<T[]> arrayClass(Class<T> klazz) {
try {
return (Class<T[]>) Class.forName("[L" + klazz.getName() + ";");
} catch (ReflectiveOperationException ex) {
throw new Error(ex);
}
}
Object next(Object node) { return NEXT.get(node); }
Throwable ex(Object node) { return (Throwable) EX.get(node); }
static class FailingTaskException extends RuntimeException {}
static class FailingTask extends RecursiveAction {
public void compute() { throw new FailingTaskException(); }
}
/** Counts all FailingTaskExceptions still recorded in exceptionTable. */
int retainedExceptions() {
exceptionTableLock.lock();
try {
int count = 0;
for (Object node : exceptionTable)
for (; node != null; node = next(node))
if (ex(node) instanceof FailingTaskException)
count++;
return count;
} finally {
exceptionTableLock.unlock();
}
}
@Test
public void exceptionTableCleanup() throws Exception {
ArrayList<FailingTask> failedTasks = failedTasks();
// Retain a strong ref to one last failing task
FailingTask lastTask = failedTasks.get(rnd.nextInt(failedTasks.size()));
// Clear all other strong refs, making exception table cleanable
failedTasks.clear();
BooleanSupplier exceptionTableIsClean = () -> {
try {
// Trigger exception table expunging as side effect
lastTask.join();
throw new AssertionError("should throw");
} catch (FailingTaskException expected) {}
int count = retainedExceptions();
if (count == 0)
throw new AssertionError("expected to find last task");
return count == 1;
};
gcAwait(exceptionTableIsClean);
}
/** Sequestered into a separate method to inhibit GC retention. */
ArrayList<FailingTask> failedTasks()
throws Exception {
final ForkJoinPool pool = new ForkJoinPool(rnd.nextInt(1, 4));
assertEquals(0, retainedExceptions());
final ArrayList<FailingTask> tasks = new ArrayList<>();
for (int i = exceptionTable.length; i--> 0; ) {
FailingTask task = new FailingTask();
pool.execute(task);
tasks.add(task); // retain strong refs to all tasks, for now
task = null; // excessive GC retention paranoia
}
for (FailingTask task : tasks) {
try {
task.join();
throw new AssertionError("should throw");
} catch (FailingTaskException success) {}
task = null; // excessive GC retention paranoia
}
if (rnd.nextBoolean())
gcAwait(() -> retainedExceptions() == tasks.size());
return tasks;
}
// --------------- GC finalization infrastructure ---------------
/** No guarantees, but effective in practice. */
static void forceFullGc() {
long timeoutMillis = 1000L;
CountDownLatch finalized = new CountDownLatch(1);
ReferenceQueue<Object> queue = new ReferenceQueue<>();
WeakReference<Object> ref = new WeakReference<>(
new Object() { protected void finalize() { finalized.countDown(); }},
queue);
try {
for (int tries = 3; tries--> 0; ) {
System.gc();
if (finalized.await(timeoutMillis, MILLISECONDS)
&& queue.remove(timeoutMillis) != null
&& ref.get() == null) {
System.runFinalization(); // try to pick up stragglers
return;
}
timeoutMillis *= 4;
}
} catch (InterruptedException unexpected) {
throw new AssertionError("unexpected InterruptedException");
}
throw new AssertionError("failed to do a \"full\" gc");
}
static void gcAwait(BooleanSupplier s) {
for (int i = 0; i < 10; i++) {
if (s.getAsBoolean())
return;
forceFullGc();
}
throw new AssertionError("failed to satisfy condition");
}
}

View file

@ -1720,4 +1720,20 @@ public class ForkJoinTaskTest extends JSR166TestCase {
task.toString());
}
}
// adaptInterruptible deferred to its own independent change
// https://bugs.openjdk.java.net/browse/JDK-8246587
// /**
// * adaptInterruptible(callable).toString() contains toString of wrapped task
// */
// public void testAdaptInterruptible_Callable_toString() {
// if (testImplementationDetails) {
// Callable<String> c = () -> "";
// ForkJoinTask<String> task = ForkJoinTask.adaptInterruptible(c);
// assertEquals(
// identityString(task) + "[Wrapped task = " + c.toString() + "]",
// task.toString());
// }
// }
}