8075939: Stream.flatMap() causes breaking of short-circuiting of terminal operations

Reviewed-by: forax, smarks
This commit is contained in:
Paul Sandoz 2017-12-21 13:52:20 -08:00
parent 8c39e16731
commit e1e9023545
6 changed files with 262 additions and 51 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -253,12 +253,14 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
@Override
public void begin(long size) {
downstream.begin(-1);
@ -267,11 +269,27 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstream);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstream);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
}
}
}
}
@Override
public boolean cancellationRequested() {
// If this method is called then an operation within the stream
// pipeline is short-circuiting (see AbstractPipeline.copyInto).
// Note that we cannot differentiate between an upstream or
// downstream operation
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
@ -280,13 +298,17 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
IntConsumer downstreamAsInt = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
@ -295,11 +317,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (IntStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsInt);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsInt);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsInt));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
@ -308,13 +342,17 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
Objects.requireNonNull(mapper);
// We can do better than this, by polling cancellationRequested when stream is infinite
return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<P_OUT, Double>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
DoubleConsumer downstreamAsDouble = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
@ -323,11 +361,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (DoubleStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsDouble);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsDouble);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsDouble));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};
@ -342,7 +392,12 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
// cache the consumer to avoid creation on every accepted element
LongConsumer downstreamAsLong = downstream::accept;
@Override
public void begin(long size) {
downstream.begin(-1);
@ -351,11 +406,23 @@ abstract class ReferencePipeline<P_IN, P_OUT>
@Override
public void accept(P_OUT u) {
try (LongStream result = mapper.apply(u)) {
// We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
if (result != null)
result.sequential().forEach(downstreamAsLong);
if (result != null) {
if (!cancellationRequestedCalled) {
result.sequential().forEach(downstreamAsLong);
}
else {
var s = result.sequential().spliterator();
do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstreamAsLong));
}
}
}
}
@Override
public boolean cancellationRequested() {
cancellationRequestedCalled = true;
return downstream.cancellationRequested();
}
};
}
};