8210971: Add exception handling methods to CompletionStage and CompletableFuture

Reviewed-by: martin, chegar
This commit is contained in:
Doug Lea 2018-09-28 08:45:46 -07:00
parent 5d3b3156e8
commit 0b431957a5
3 changed files with 869 additions and 64 deletions

View file

@ -957,17 +957,17 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
@SuppressWarnings("serial")
static final class UniExceptionally<T> extends UniCompletion<T,T> {
Function<? super Throwable, ? extends T> fn;
UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src,
UniExceptionally(Executor executor,
CompletableFuture<T> dep, CompletableFuture<T> src,
Function<? super Throwable, ? extends T> fn) {
super(null, dep, src); this.fn = fn;
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<T> tryFire(int mode) { // never ASYNC
// assert mode != ASYNC;
final CompletableFuture<T> tryFire(int mode) {
CompletableFuture<T> d; CompletableFuture<T> a;
Object r; Function<? super Throwable, ? extends T> f;
if ((d = dep) == null || (f = fn) == null
|| (a = src) == null || (r = a.result) == null
|| !d.uniExceptionally(r, f, this))
|| !d.uniExceptionally(r, f, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
@ -980,11 +980,11 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
Throwable x;
if (result == null) {
try {
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) {
if (c != null && !c.claim())
return false;
if (c != null && !c.claim())
return false;
if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
completeValue(f.apply(x));
} else
else
internalComplete(r);
} catch (Throwable ex) {
completeThrowable(ex);
@ -994,14 +994,88 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
private CompletableFuture<T> uniExceptionallyStage(
Function<Throwable, ? extends T> f) {
Executor e, Function<Throwable, ? extends T> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<T> d = newIncompleteFuture();
Object r;
if ((r = result) == null)
unipush(new UniExceptionally<T>(d, this, f));
else
unipush(new UniExceptionally<T>(e, d, this, f));
else if (e == null)
d.uniExceptionally(r, f, null);
else {
try {
e.execute(new UniExceptionally<T>(null, d, this, f));
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
}
return d;
}
@SuppressWarnings("serial")
static final class UniComposeExceptionally<T> extends UniCompletion<T,T> {
Function<Throwable, ? extends CompletionStage<T>> fn;
UniComposeExceptionally(Executor executor, CompletableFuture<T> dep,
CompletableFuture<T> src,
Function<Throwable, ? extends CompletionStage<T>> fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture<T> tryFire(int mode) {
CompletableFuture<T> d; CompletableFuture<T> a;
Function<Throwable, ? extends CompletionStage<T>> f;
Object r; Throwable x;
if ((d = dep) == null || (f = fn) == null
|| (a = src) == null || (r = a.result) == null)
return null;
if (d.result == null) {
if ((r instanceof AltResult) &&
(x = ((AltResult)r).ex) != null) {
try {
if (mode <= 0 && !claim())
return null;
CompletableFuture<T> g = f.apply(x).toCompletableFuture();
if ((r = g.result) != null)
d.completeRelay(r);
else {
g.unipush(new UniRelay<T,T>(d, g));
if (d.result == null)
return null;
}
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
else
d.internalComplete(r);
}
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}
private CompletableFuture<T> uniComposeExceptionallyStage(
Executor e, Function<Throwable, ? extends CompletionStage<T>> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<T> d = newIncompleteFuture();
Object r, s; Throwable x;
if ((r = result) == null)
unipush(new UniComposeExceptionally<T>(e, d, this, f));
else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null)
d.internalComplete(r);
else
try {
if (e != null)
e.execute(new UniComposeExceptionally<T>(null, d, this, f));
else {
CompletableFuture<T> g = f.apply(x).toCompletableFuture();
if ((s = g.result) != null)
d.result = encodeRelay(s);
else
g.unipush(new UniRelay<T,T>(d, g));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
@ -1093,7 +1167,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
Object r, s; Throwable x;
if ((r = result) == null)
unipush(new UniCompose<T,V>(e, d, this, f));
else if (e == null) {
else {
if (r instanceof AltResult) {
if ((x = ((AltResult)r).ex) != null) {
d.result = encodeThrowable(x, r);
@ -1102,23 +1176,20 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
r = null;
}
try {
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
if ((s = g.result) != null)
d.result = encodeRelay(s);
if (e != null)
e.execute(new UniCompose<T,V>(null, d, this, f));
else {
g.unipush(new UniRelay<V,V>(d, g));
@SuppressWarnings("unchecked") T t = (T) r;
CompletableFuture<V> g = f.apply(t).toCompletableFuture();
if ((s = g.result) != null)
d.result = encodeRelay(s);
else
g.unipush(new UniRelay<V,V>(d, g));
}
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
}
else
try {
e.execute(new UniCompose<T,V>(null, d, this, f));
} catch (Throwable ex) {
d.result = encodeThrowable(ex);
}
return d;
}
@ -1898,7 +1969,7 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
* Creates a new complete CompletableFuture with given encoded result.
*/
CompletableFuture(Object r) {
this.result = r;
RESULT.setRelease(this, r);
}
/**
@ -2285,28 +2356,36 @@ public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
return this;
}
// not in interface CompletionStage
/**
* Returns a new CompletableFuture that is completed when this
* CompletableFuture completes, with the result of the given
* function of the exception triggering this CompletableFuture's
* completion when it completes exceptionally; otherwise, if this
* CompletableFuture completes normally, then the returned
* CompletableFuture also completes normally with the same value.
* Note: More flexible versions of this functionality are
* available using methods {@code whenComplete} and {@code handle}.
*
* @param fn the function to use to compute the value of the
* returned CompletableFuture if this CompletableFuture completed
* exceptionally
* @return the new CompletableFuture
*/
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
return uniExceptionallyStage(null, fn);
}
public CompletableFuture<T> exceptionallyAsync(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(defaultExecutor(), fn);
}
public CompletableFuture<T> exceptionallyAsync(
Function<Throwable, ? extends T> fn, Executor executor) {
return uniExceptionallyStage(screenExecutor(executor), fn);
}
public CompletableFuture<T> exceptionallyCompose(
Function<Throwable, ? extends CompletionStage<T>> fn) {
return uniComposeExceptionallyStage(null, fn);
}
public CompletableFuture<T> exceptionallyComposeAsync(
Function<Throwable, ? extends CompletionStage<T>> fn) {
return uniComposeExceptionallyStage(defaultExecutor(), fn);
}
public CompletableFuture<T> exceptionallyComposeAsync(
Function<Throwable, ? extends CompletionStage<T>> fn,
Executor executor) {
return uniComposeExceptionallyStage(screenExecutor(executor), fn);
}
/* ------------- Arbitrary-arity constructions -------------- */