diff --git a/src/java.base/share/classes/java/util/stream/AbstractPipeline.java b/src/java.base/share/classes/java/util/stream/AbstractPipeline.java index 9517bc4f7b8..bba017d4144 100644 --- a/src/java.base/share/classes/java/util/stream/AbstractPipeline.java +++ b/src/java.base/share/classes/java/util/stream/AbstractPipeline.java @@ -134,12 +134,6 @@ abstract class AbstractPipeline> */ private boolean linkedOrConsumed; - /** - * True if there are any stateful ops in the pipeline; only valid for the - * source stage. - */ - private boolean sourceAnyStateful; - private Runnable sourceCloseAction; /** @@ -208,8 +202,6 @@ abstract class AbstractPipeline> this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; - if (opIsStateful()) - sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; } @@ -386,6 +378,21 @@ abstract class AbstractPipeline> return StreamOpFlag.toStreamFlags(combinedFlags); } + /** + * Returns whether any of the stages of the current segment is stateful + * or not. + * @return {@code true} if any stage in this segment is stateful, + * {@code false} if not. + */ + protected final boolean hasAnyStateful() { + var result = false; + for (var u = sourceStage.nextStage; + u != null && !(result = u.opIsStateful()) && u != this; + u = u.nextStage) { + } + return result; + } + /** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a @@ -409,7 +416,7 @@ abstract class AbstractPipeline> throw new IllegalStateException(MSG_CONSUMED); } - if (isParallel() && sourceStage.sourceAnyStateful) { + if (isParallel() && hasAnyStateful()) { // Adapt the source spliterator, evaluating each stateful op // in the pipeline up to and including this pipeline stage. // The depth and flags of each pipeline stage are adjusted accordingly.