This commit is contained in:
Jesper Wilhelmsson 2021-01-19 22:49:44 +00:00
commit cf25383d19
2 changed files with 62 additions and 59 deletions

View file

@ -1915,45 +1915,49 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* throws TimeoutException on timeout. * throws TimeoutException on timeout.
*/ */
private Object timedGet(long nanos) throws TimeoutException { private Object timedGet(long nanos) throws TimeoutException {
if (Thread.interrupted()) long d = System.nanoTime() + nanos;
return null; long deadline = (d == 0L) ? 1L : d; // avoid 0
if (nanos > 0L) { boolean interrupted = false, queued = false;
long d = System.nanoTime() + nanos; Signaller q = null;
long deadline = (d == 0L) ? 1L : d; // avoid 0 Object r = null;
Signaller q = null; for (;;) { // order of checking interrupt, result, timeout matters
boolean queued = false; if (interrupted || (interrupted = Thread.interrupted()))
Object r; break;
while ((r = result) == null) { // similar to untimed else if ((r = result) != null)
if (q == null) { break;
q = new Signaller(true, nanos, deadline); else if (nanos <= 0L)
if (Thread.currentThread() instanceof ForkJoinWorkerThread) break;
ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); else if (q == null) {
} q = new Signaller(true, nanos, deadline);
else if (!queued) if (Thread.currentThread() instanceof ForkJoinWorkerThread)
queued = tryPushStack(q); ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);
else if (q.nanos <= 0L) }
break; else if (!queued)
else { queued = tryPushStack(q);
try { else {
ForkJoinPool.managedBlock(q); try {
} catch (InterruptedException ie) { ForkJoinPool.managedBlock(q);
q.interrupted = true; interrupted = q.interrupted;
} nanos = q.nanos;
if (q.interrupted) } catch (InterruptedException ie) {
break; interrupted = true;
} }
} }
if (q != null && queued) {
q.thread = null;
if (r == null)
cleanStack();
}
if (r != null || (r = result) != null)
postComplete();
if (r != null || (q != null && q.interrupted))
return r;
} }
throw new TimeoutException(); if (q != null) {
q.thread = null;
if (r == null)
cleanStack();
}
if (r != null) {
if (interrupted)
Thread.currentThread().interrupt();
postComplete();
return r;
} else if (interrupted)
return null;
else
throw new TimeoutException();
} }
/* ------------- public methods -------------- */ /* ------------- public methods -------------- */

View file

@ -24,6 +24,7 @@
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import static java.util.concurrent.TimeUnit.DAYS;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/* /*
@ -33,57 +34,55 @@ import java.util.concurrent.atomic.AtomicReference;
* @key randomness * @key randomness
*/ */
// TODO: incorporate into CompletableFuture tck tests
public class SwallowedInterruptedException { public class SwallowedInterruptedException {
static final int ITERATIONS = 100; static final int ITERATIONS = 100;
public static void main(String[] args) throws Throwable { public static void main(String[] args) throws Throwable {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 1; i <= ITERATIONS; i++) { for (int i = 1; i <= ITERATIONS; i++) {
System.out.format("Iteration %d%n", i); boolean timed = rnd.nextBoolean();
long sleepMillis = rnd.nextLong(10);
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
CountDownLatch running = new CountDownLatch(1); CountDownLatch threadRunning = new CountDownLatch(1);
AtomicReference<String> failed = new AtomicReference<>(); AtomicReference<Throwable> fail = new AtomicReference<>();
Thread thread = new Thread(() -> { Thread thread = new Thread(() -> {
// signal main thread that child is running threadRunning.countDown();
running.countDown();
// invoke Future.get, it complete with the interrupt status set or
// else throw InterruptedException with the interrupt status not set.
try { try {
future.get(); Void result = (timed) ? future.get(1, DAYS) : future.get();
// interrupt status should be set
if (!Thread.currentThread().isInterrupted()) { if (!Thread.currentThread().isInterrupted()) {
failed.set("Future.get completed with interrupt status not set"); fail.set(new AssertionError(
"Future.get completed with interrupt status not set"));
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
// interrupt status should be cleared
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
failed.set("InterruptedException with interrupt status set"); fail.set(new AssertionError(
"InterruptedException with interrupt status set"));
} }
} catch (Throwable ex) { } catch (Throwable ex) {
failed.set("Unexpected exception " + ex); fail.set(ex);
} }
}); });
thread.setDaemon(true);
thread.start(); thread.start();
threadRunning.await();
// wait for thread to run // interrupt thread, then set result after an optional (random) delay
running.await();
// interrupt thread and set result after an optional (random) delay
thread.interrupt(); thread.interrupt();
long sleepMillis = ThreadLocalRandom.current().nextLong(10);
if (sleepMillis > 0) if (sleepMillis > 0)
Thread.sleep(sleepMillis); Thread.sleep(sleepMillis);
future.complete(null); future.complete(null);
// wait for thread to terminate and check for failure
thread.join(); thread.join();
String failedReason = failed.get(); if (fail.get() != null) {
if (failedReason != null) { throw new AssertionError(
throw new RuntimeException("Test failed: " + failedReason); String.format("Test failed at iteration %d with [timed=%s sleepMillis=%d]",
i, timed, sleepMillis),
fail.get());
} }
} }
} }