8319123: Implement JEP 461: Stream Gatherers (Preview)

Reviewed-by: tvaleev, alanb, psandoz
This commit is contained in:
Viktor Klang 2023-11-30 14:45:23 +00:00 committed by Alan Bateman
parent 04ad98ed32
commit 33b26f79a9
24 changed files with 4988 additions and 7 deletions

View file

@ -85,7 +85,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* The "upstream" pipeline, or null if this is the source stage.
*/
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
protected final AbstractPipeline previousStage;
/**
* The operation flags for the intermediate operation represented by this
@ -188,9 +188,13 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* Constructor for appending an intermediate operation stage onto an
* existing pipeline.
*
* The previous stage must be unlinked and unconsumed.
*
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
@ -205,6 +209,41 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
this.depth = previousStage.depth + 1;
}
/**
* Constructor for replacing an intermediate operation stage onto an
* existing pipeline.
*
* @param previousPreviousStage the upstream pipeline stage of the upstream pipeline stage
* @param previousStage the upstream pipeline stage
* @param opFlags the operation flags for the new stage, described in
* {@link StreamOpFlag}
* @throws IllegalStateException if previousStage is already linked or
* consumed
*/
protected AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousPreviousStage, AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed || !previousPreviousStage.linkedOrConsumed || previousPreviousStage.nextStage != previousStage || previousStage.previousStage != previousPreviousStage)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousPreviousStage.nextStage = this;
this.previousStage = previousPreviousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousPreviousStage.combinedFlags);
this.sourceStage = previousPreviousStage.sourceStage;
this.depth = previousPreviousStage.depth + 1;
}
/**
* Checks that the current stage has not been already linked or consumed,
* and then sets this stage as being linked or consumed.
*/
protected void linkOrConsume() {
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
}
// Terminal evaluation methods
@ -402,7 +441,7 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
* operation.
*/
@SuppressWarnings("unchecked")
private Spliterator<?> sourceSpliterator(int terminalFlags) {
protected Spliterator<?> sourceSpliterator(int terminalFlags) {
// Get the source spliterator of the pipeline
Spliterator<?> spliterator = null;
if (sourceStage.sourceSpliterator != null) {
@ -740,6 +779,6 @@ abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
@SuppressWarnings("unchecked")
<P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator) {
return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator();
return opEvaluateParallel(helper, spliterator, Nodes.castingArray()).spliterator();
}
}

View file

@ -0,0 +1,593 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import jdk.internal.javac.PreviewFeature;
import jdk.internal.vm.annotation.ForceInline;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
/**
* An intermediate operation that transforms a stream of input elements into a
* stream of output elements, optionally applying a final action when the end of
* the upstream is reached. The transformation may be stateless or stateful,
* and may buffer input before producing any output.
*
* <p>Gatherer operations can be performed either sequentially,
* or be parallelized -- if a combiner function is supplied.
*
* <p>There are many examples of gathering operations, including but not
* limited to:
* grouping elements into batches (windowing functions);
* de-duplicating consecutively similar elements; incremental accumulation
* functions (prefix scan); incremental reordering functions, etc. The class
* {@link java.util.stream.Gatherers} provides implementations of common
* gathering operations.
*
* @apiNote
* <p>A {@code Gatherer} is specified by four functions that work together to
* process input elements, optionally using intermediate state, and optionally
* perform a final action at the end of input. They are: <ul>
* <li>creating a new, potentially mutable, state ({@link #initializer()})</li>
* <li>integrating a new input element ({@link #integrator()})</li>
* <li>combining two states into one ({@link #combiner()})</li>
* <li>performing an optional final action ({@link #finisher()})</li>
* </ul>
*
* <p>Each invocation of {@link #initializer()}, {@link #integrator()},
* {@link #combiner()}, and {@link #finisher()} must return a semantically
* identical result.
*
* <p>Implementations of Gatherer must not capture, retain, or expose to
* other threads, the references to the state instance, or the downstream
* {@link Downstream} for longer than the invocation duration of the method
* which they are passed to.
*
* <p>Performing a gathering operation with a {@code Gatherer} should produce a
* result equivalent to:
*
* {@snippet lang = java:
* Gatherer.Downstream<? super R> downstream = ...;
* A state = gatherer.initializer().get();
* for (T t : data) {
* gatherer.integrator().integrate(state, t, downstream);
* }
* gatherer.finisher().accept(state, downstream);
* }
*
* <p>However, the library is free to partition the input, perform the
* integrations on the partitions, and then use the combiner function to
* combine the partial results to achieve a gathering operation. (Depending
* on the specific gathering operation, this may perform better or worse,
* depending on the relative cost of the integrator and combiner functions.)
*
* <p>In addition to the predefined implementations in {@link Gatherers}, the
* static factory methods {@code of(...)} and {@code ofSequential(...)}
* can be used to construct gatherers. For example, you could create a gatherer
* that implements the equivalent of
* {@link java.util.stream.Stream#map(java.util.function.Function)} with:
*
* {@snippet lang = java:
* public static <T, R> Gatherer<T, ?, R> map(Function<? super T, ? extends R> mapper) {
* return Gatherer.of(
* (unused, element, downstream) -> // integrator
* downstream.push(mapper.apply(element))
* );
* }
* }
*
* <p>Gatherers are designed to be <em>composed</em>; two or more Gatherers can
* be composed into a single Gatherer using the {@link #andThen(Gatherer)}
* method.
*
* {@snippet lang = java:
* // using the implementation of `map` as seen above
* Gatherer<Integer, ?, Integer> increment = map(i -> i + 1);
*
* Gatherer<Object, ?, String> toString = map(i -> i.toString());
*
* Gatherer<Integer, ?, String> incrementThenToString = increment.andThen(toString);
* }
*
* <p>As an example, a Gatherer implementing a sequential Prefix Scan could
* be done the following way:
*
* {@snippet lang = java:
* public static <T, R> Gatherer<T, ?, R> scan(
* Supplier<R> initial,
* BiFunction<? super R, ? super T, ? extends R> scanner) {
*
* class State {
* R current = initial.get();
* }
*
* return Gatherer.<T, State, R>ofSequential(
* State::new,
* Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
* state.current = scanner.apply(state.current, element);
* return downstream.push(state.current);
* })
* );
* }
* }
*
* <p>Example of usage:
*
* {@snippet lang = java:
* // will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]
* List<String> numberStrings =
* Stream.of(1,2,3,4,5,6,7,8,9)
* .gather(
* scan(() -> "", (string, number) -> string + number)
* )
* .toList();
* }
*
* @implSpec Libraries that implement transformations based on {@code Gatherer},
* such as {@link Stream#gather(Gatherer)}, must adhere to the following
* constraints:
* <ul>
* <li>Gatherers whose initializer is {@link #defaultInitializer()} are
* considered to be stateless, and invoking their initializer is optional.
* </li>
* <li>Gatherers whose integrator is an instance of {@link Integrator.Greedy}
* can be assumed not to short-circuit, and the return value of invoking
* {@link Integrator#integrate(Object, Object, Downstream)} does not need to
* be inspected.</li>
* <li>The first argument passed to the integration function, both
* arguments passed to the combiner function, and the argument passed to the
* finisher function must be the result of a previous invocation of the
* initializer or combiner functions.</li>
* <li>The implementation should not do anything with the result of any of
* the initializer or combiner functions other than to
* pass them again to the integrator, combiner, or finisher functions.</li>
* <li>Once a state object is passed to the combiner or finisher function,
* it is never passed to the integrator function again.</li>
* <li>When the integrator function returns {@code false},
* it shall be interpreted just as if there were no more elements to pass
* it.</li>
* <li>For parallel evaluation, the gathering implementation must manage
* that the input is properly partitioned, that partitions are processed
* in isolation, and combining happens only after integration is complete
* for both partitions.</li>
* <li>Gatherers whose combiner is {@link #defaultCombiner()} may only be
* evaluated sequentially. All other combiners allow the operation to be
* parallelized by initializing each partition in separation, invoking
* the integrator until it returns {@code false}, and then joining each
* partitions state using the combiner, and then invoking the finisher on
* the joined state. Outputs and state later in the input sequence will
* be discarded if processing an earlier partition short-circuits.</li>
* <li>Gatherers whose finisher is {@link #defaultFinisher()} are considered
* to not have an end-of-stream hook and invoking their finisher is
* optional.</li>
* </ul>
*
* @see Stream#gather(Gatherer)
* @see Gatherers
*
* @param <T> the type of input elements to the gatherer operation
* @param <A> the potentially mutable state type of the gatherer operation
* (often hidden as an implementation detail)
* @param <R> the type of output elements from the gatherer operation
* @since 22
*/
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
public interface Gatherer<T, A, R> {
/**
* A function that produces an instance of the intermediate state used for
* this gathering operation.
*
* @implSpec The implementation in this interface returns
* {@link #defaultInitializer()}.
*
* @return A function that produces an instance of the intermediate state
* used for this gathering operation
*/
default Supplier<A> initializer() {
return defaultInitializer();
};
/**
* A function which integrates provided elements, potentially using
* the provided intermediate state, optionally producing output to the
* provided {@link Downstream}.
*
* @return a function which integrates provided elements, potentially using
* the provided state, optionally producing output to the provided
* Downstream
*/
Integrator<A, T, R> integrator();
/**
* A function which accepts two intermediate states and combines them into
* one.
*
* @implSpec The implementation in this interface returns
* {@link #defaultCombiner()}.
*
* @return a function which accepts two intermediate states and combines
* them into one
*/
default BinaryOperator<A> combiner() {
return defaultCombiner();
}
/**
* A function which accepts the final intermediate state
* and a {@link Downstream} object, allowing to perform a final action at
* the end of input elements.
*
* @implSpec The implementation in this interface returns
* {@link #defaultFinisher()}.
*
* @return a function which transforms the intermediate result to the final
* result(s) which are then passed on to the provided Downstream
*/
default BiConsumer<A, Downstream<? super R>> finisher() {
return defaultFinisher();
}
/**
* Returns a composed Gatherer which connects the output of this Gatherer
* to the input of that Gatherer.
*
* @implSpec The implementation in this interface returns a new Gatherer
* which is semantically equivalent to the combination of
* {@code this} and {@code that} gatherer.
*
* @param that the other gatherer
* @param <RR> The type of output of that Gatherer
* @throws NullPointerException if the argument is {@code null}
* @return returns a composed Gatherer which connects the output of this
* Gatherer as input that Gatherer
*/
default <RR> Gatherer<T, ?, RR> andThen(Gatherer<? super R, ?, ? extends RR> that) {
Objects.requireNonNull(that);
return Gatherers.Composite.of(this, that);
}
/**
* Returns an initializer which is the default initializer of a Gatherer.
* The returned initializer identifies that the owner Gatherer is stateless.
*
* @implSpec This method always returns the same instance.
*
* @see Gatherer#initializer()
* @return the instance of the default initializer
* @param <A> the type of the state of the returned initializer
*/
static <A> Supplier<A> defaultInitializer() {
return Gatherers.Value.DEFAULT.initializer();
}
/**
* Returns a combiner which is the default combiner of a Gatherer.
* The returned combiner identifies that the owning Gatherer must only
* be evaluated sequentially.
*
* @implSpec This method always returns the same instance.
*
* @see Gatherer#finisher()
* @return the instance of the default combiner
* @param <A> the type of the state of the returned combiner
*/
static <A> BinaryOperator<A> defaultCombiner() {
return Gatherers.Value.DEFAULT.combiner();
}
/**
* Returns a {@code finisher} which is the default finisher of
* a {@code Gatherer}.
* The returned finisher identifies that the owning Gatherer performs
* no additional actions at the end of input.
*
* @implSpec This method always returns the same instance.
*
* @see Gatherer#finisher()
* @return the instance of the default finisher
* @param <A> the type of the state of the returned finisher
* @param <R> the type of the Downstream of the returned finisher
*/
static <A, R> BiConsumer<A, Downstream<? super R>> defaultFinisher() {
return Gatherers.Value.DEFAULT.finisher();
}
/**
* Returns a new, sequential, and stateless {@code Gatherer} described by
* the given {@code integrator}.
*
* @param integrator the integrator function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if the argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> ofSequential(
Integrator<Void, T, R> integrator) {
return of(
defaultInitializer(),
integrator,
defaultCombiner(),
defaultFinisher()
);
}
/**
* Returns a new, sequential, and stateless {@code Gatherer} described by
* the given {@code integrator} and {@code finisher}.
*
* @param integrator the integrator function for the new gatherer
* @param finisher the finisher function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> ofSequential(
Integrator<Void, T, R> integrator,
BiConsumer<Void, Downstream<? super R>> finisher) {
return of(
defaultInitializer(),
integrator,
defaultCombiner(),
finisher
);
}
/**
* Returns a new, sequential, {@code Gatherer} described by the given
* {@code initializer} and {@code integrator}.
*
* @param initializer the initializer function for the new gatherer
* @param integrator the integrator function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <A> the type of state for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> ofSequential(
Supplier<A> initializer,
Integrator<A, T, R> integrator) {
return of(
initializer,
integrator,
defaultCombiner(),
defaultFinisher()
);
}
/**
* Returns a new, sequential, {@code Gatherer} described by the given
* {@code initializer}, {@code integrator}, and {@code finisher}.
*
* @param initializer the initializer function for the new gatherer
* @param integrator the integrator function for the new gatherer
* @param finisher the finisher function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <A> the type of state for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> ofSequential(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BiConsumer<A, Downstream<? super R>> finisher) {
return of(
initializer,
integrator,
defaultCombiner(),
finisher
);
}
/**
* Returns a new, parallelizable, and stateless {@code Gatherer} described
* by the given {@code integrator}.
*
* @param integrator the integrator function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> of(Integrator<Void, T, R> integrator) {
return of(
defaultInitializer(),
integrator,
Gatherers.Value.DEFAULT.statelessCombiner,
defaultFinisher()
);
}
/**
* Returns a new, parallelizable, and stateless {@code Gatherer} described
* by the given {@code integrator} and {@code finisher}.
*
* @param integrator the integrator function for the new gatherer
* @param finisher the finisher function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, R> Gatherer<T, Void, R> of(
Integrator<Void, T, R> integrator,
BiConsumer<Void, Downstream<? super R>> finisher) {
return of(
defaultInitializer(),
integrator,
Gatherers.Value.DEFAULT.statelessCombiner,
finisher
);
}
/**
* Returns a new, parallelizable, {@code Gatherer} described by the given
* {@code initializer}, {@code integrator}, {@code combiner} and
* {@code finisher}.
*
* @param initializer the initializer function for the new gatherer
* @param integrator the integrator function for the new gatherer
* @param combiner the combiner function for the new gatherer
* @param finisher the finisher function for the new gatherer
* @param <T> the type of input elements for the new gatherer
* @param <A> the type of state for the new gatherer
* @param <R> the type of results for the new gatherer
* @throws NullPointerException if any argument is {@code null}
* @return the new {@code Gatherer}
*/
static <T, A, R> Gatherer<T, A, R> of(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BinaryOperator<A> combiner,
BiConsumer<A, Downstream<? super R>> finisher) {
return new Gatherers.GathererImpl<>(
Objects.requireNonNull(initializer),
Objects.requireNonNull(integrator),
Objects.requireNonNull(combiner),
Objects.requireNonNull(finisher)
);
}
/**
* A Downstream object is the next stage in a pipeline of operations,
* to which elements can be sent.
* @param <T> the type of elements this downstream accepts
* @since 22
*/
@FunctionalInterface
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
interface Downstream<T> {
/**
* Pushes, if possible, the provided element downstream -- to the next
* stage in the pipeline.
*
* @implSpec If this method returns {@code false} then no further
* elements will be accepted and subsequent invocations of this method
* will return {@code false}.
*
* @param element the element to push downstream
* @return {@code true} if more elements can be sent,
* and {@code false} if not.
*/
boolean push(T element);
/**
* Checks whether the next stage is known to not want
* any more elements sent to it.
*
* @apiNote This is best-effort only, once this returns {@code true} it
* should never return {@code false} again for the same instance.
*
* @implSpec The implementation in this interface returns {@code false}.
*
* @return {@code true} if this Downstream is known not to want any
* more elements sent to it, {@code false} if otherwise
*/
default boolean isRejecting() { return false; }
}
/**
* An Integrator receives elements and processes them,
* optionally using the supplied state, and optionally sends incremental
* results downstream.
*
* @param <A> the type of state used by this integrator
* @param <T> the type of elements this integrator consumes
* @param <R> the type of results this integrator can produce
* @since 22
*/
@FunctionalInterface
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
interface Integrator<A, T, R> {
/**
* Performs an action given: the current state, the next element, and
* a downstream object; potentially inspecting and/or updating
* the state, optionally sending any number of elements downstream
* -- and then returns whether more elements are to be consumed or not.
*
* @param state The state to integrate into
* @param element The element to integrate
* @param downstream The downstream object of this integration
* @return {@code true} if subsequent integration is desired,
* {@code false} if not
*/
boolean integrate(A state, T element, Downstream<? super R> downstream);
/**
* Factory method for turning Integrator-shaped lambdas into
* Integrators.
*
* @param integrator a lambda to create as Integrator
* @return the given lambda as an Integrator
* @param <A> the type of state used by this integrator
* @param <T> the type of elements this integrator receives
* @param <R> the type of results this integrator can produce
*/
@ForceInline
static <A, T, R> Integrator<A, T, R> of(Integrator<A, T, R> integrator) {
return integrator;
}
/**
* Factory method for turning Integrator-shaped lambdas into
* {@link Greedy} Integrators.
*
* @param greedy a lambda to create as Integrator.Greedy
* @return the given lambda as a Greedy Integrator
* @param <A> the type of state used by this integrator
* @param <T> the type of elements this integrator receives
* @param <R> the type of results this integrator can produce
*/
@ForceInline
static <A, T, R> Greedy<A, T, R> ofGreedy(Greedy<A, T, R> greedy) {
return greedy;
}
/**
* Greedy Integrators consume all their input, and may only relay that
* the downstream does not want more elements.
*
* @implSpec This interface is used to communicate that no
* short-circuiting will be <i>initiated</i> by this Integrator, and that
* information can then be used to optimize evaluation.
*
* @param <A> the type of state used by this integrator
* @param <T> the type of elements this greedy integrator receives
* @param <R> the type of results this greedy integrator can produce
* @since 22
*/
@FunctionalInterface
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
interface Greedy<A, T, R> extends Integrator<A, T, R> { }
}
}

View file

@ -0,0 +1,754 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import jdk.internal.vm.annotation.ForceInline;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.function.ToIntFunction;
import java.util.function.ToLongFunction;
import java.util.stream.Gatherer.Integrator;
/**
* Runtime machinery for evaluating Gatherers under different modes.
* The performance-critical code below contains some more complicated encodings:
* therefore, make sure to run benchmarks to verify changes to prevent regressions.
*
* @since 22
*/
final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
@SuppressWarnings("unchecked")
static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(
ReferencePipeline<P_IN, P_OUT> upstream,
Gatherer<T, A, R> gatherer) {
// When attaching a gather-operation onto another gather-operation,
// we can fuse them into one
if (upstream.getClass() == GathererOp.class) {
return new GathererOp<>(
((GathererOp<P_IN, Object, P_OUT>) upstream).gatherer.andThen(gatherer),
(GathererOp<?, ?, P_IN>) upstream);
} else {
return new GathererOp<>(
(ReferencePipeline<?, T>) upstream,
gatherer);
}
}
/*
* GathererOp.NodeBuilder is a lazy accumulator of elements with O(1)
* `append`, and O(8) `join` (concat).
*
* First `append` inflates a growable Builder, the O(8) for `join` is
* because we prefer to delegate to `append` for small concatenations to
* avoid excessive indirections (unbalanced Concat-trees) when joining many
* NodeBuilders together.
*/
static final class NodeBuilder<X> implements Consumer<X> {
private static final int LINEAR_APPEND_MAX = 8; // TODO revisit
static final class Builder<X> extends SpinedBuffer<X> implements Node<X> {
Builder() {
}
}
NodeBuilder() {
}
private Builder<X> rightMost;
private Node<X> leftMost;
private boolean isEmpty() {
return rightMost == null && leftMost == null;
}
@Override
public void accept(X x) {
final var b = rightMost;
(b == null ? (rightMost = new NodeBuilder.Builder<>()) : b).accept(x);
}
public NodeBuilder<X> join(NodeBuilder<X> that) {
if (isEmpty())
return that;
if (!that.isEmpty()) {
final var tb = that.build();
if (rightMost != null && tb instanceof NodeBuilder.Builder<X>
&& tb.count() < LINEAR_APPEND_MAX)
tb.forEach(this); // Avoid conc for small nodes
else
leftMost = Nodes.conc(StreamShape.REFERENCE, this.build(), tb);
}
return this;
}
public Node<X> build() {
if (isEmpty())
return Nodes.emptyNode(StreamShape.REFERENCE);
final var rm = rightMost;
if (rm != null) {
rightMost = null; // Make sure builder isn't reused
final var lm = leftMost;
leftMost = (lm == null) ? rm : Nodes.conc(StreamShape.REFERENCE, lm, rm);
}
return leftMost;
}
}
static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {
private final Sink<R> sink;
private final Gatherer<T, A, R> gatherer;
private final Integrator<A, T, R> integrator; // Optimization: reuse
private A state;
private boolean proceed = true;
GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {
this.gatherer = gatherer;
this.sink = sink;
this.integrator = gatherer.integrator();
}
// java.util.stream.Sink contract below:
@Override
public void begin(long size) {
final var initializer = gatherer.initializer();
if (initializer != Gatherer.defaultInitializer()) // Optimization
state = initializer.get();
sink.begin(size);
}
@Override
public void accept(T t) {
/* Benchmarks have indicated that doing an unconditional write to
* `proceed` is more efficient than branching.
* We use `&=` here to prevent flips from `false` -> `true`.
*
* As of writing this, taking `greedy` or `stateless` into
* consideration at this point doesn't yield any performance gains.
*/
proceed &= integrator.integrate(state, t, this);
}
@Override
public boolean cancellationRequested() {
return cancellationRequested(proceed);
}
private boolean cancellationRequested(boolean knownProceed) {
// Highly performance sensitive
return !(knownProceed && (!sink.cancellationRequested() || (proceed = false)));
}
@Override
public void end() {
final var finisher = gatherer.finisher();
if (finisher != Gatherer.<A, R>defaultFinisher()) // Optimization
finisher.accept(state, this);
sink.end();
state = null; // GC assistance
}
// Gatherer.Sink contract below:
@Override
public boolean isRejecting() {
return !proceed;
}
@Override
public boolean push(R r) {
var p = proceed;
if (p)
sink.accept(r);
return !cancellationRequested(p);
}
}
private static int opFlagsFor(Integrator<?, ?, ?> integrator) {
return integrator instanceof Integrator.Greedy<?, ?, ?>
? GREEDY_FLAGS : SHORT_CIRCUIT_FLAGS;
}
private static final int DEFAULT_FLAGS =
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT |
StreamOpFlag.NOT_SIZED;
private static final int SHORT_CIRCUIT_FLAGS =
DEFAULT_FLAGS | StreamOpFlag.IS_SHORT_CIRCUIT;
private static final int GREEDY_FLAGS =
DEFAULT_FLAGS;
final Gatherer<T, A, R> gatherer;
/*
* This constructor is used for initial .gather() invocations
*/
private GathererOp(ReferencePipeline<?, T> upstream, Gatherer<T, A, R> gatherer) {
/* TODO this is a prime spot for pre-super calls to make sure that
* we only need to call `integrator()` once.
*/
super(upstream, opFlagsFor(gatherer.integrator()));
this.gatherer = gatherer;
}
/*
* This constructor is used when fusing subsequent .gather() invocations
*/
@SuppressWarnings("unchecked")
private GathererOp(Gatherer<T, A, R> gatherer, GathererOp<?, ?, T> upstream) {
super((AbstractPipeline<?, T, ?>) upstream.upstream(),
upstream,
opFlagsFor(gatherer.integrator()));
this.gatherer = gatherer;
}
/* This allows internal access to the previous stage,
* to be able to fuse `gather` followed by `collect`.
*/
@SuppressWarnings("unchecked")
private AbstractPipeline<?, T, ?> upstream() {
return (AbstractPipeline<?, T, ?>) super.previousStage;
}
@Override
boolean opIsStateful() {
// TODO
/* Currently GathererOp is always stateful, but what could be tried is:
* return gatherer.initializer() != Gatherer.defaultInitializer()
* || gatherer.combiner() == Gatherer.defaultCombiner()
* || gatherer.finisher() != Gatherer.defaultFinisher();
*/
return true;
}
@Override
Sink<T> opWrapSink(int flags, Sink<R> downstream) {
return new GatherSink<>(gatherer, downstream);
}
/*
* This is used when evaluating .gather() operations interspersed with
* other Stream operations (in parallel)
*/
@Override
<I> Node<R> opEvaluateParallel(PipelineHelper<R> unused1,
Spliterator<I> spliterator,
IntFunction<R[]> unused2) {
return this.<NodeBuilder<R>, Node<R>>evaluate(
upstream().wrapSpliterator(spliterator),
true,
gatherer,
NodeBuilder::new,
NodeBuilder::accept,
NodeBuilder::join,
NodeBuilder::build
);
}
@Override
<P_IN> Spliterator<R> opEvaluateParallelLazy(PipelineHelper<R> helper,
Spliterator<P_IN> spliterator) {
/*
* There's a very small subset of possible Gatherers which would be
* expressible as Spliterators directly,
* - the Gatherer's initializer is Gatherer.defaultInitializer(),
* - the Gatherer's combiner is NOT Gatherer.defaultCombiner()
* - the Gatherer's finisher is Gatherer.defaultFinisher()
*/
return opEvaluateParallel(null, spliterator, null).spliterator();
}
/* gather-operations immediately followed by (terminal) collect-operations
* are fused together to avoid having to first run the gathering to
* completion and only after that be able to run the collection on top of
* the output. This is highly beneficial in the parallel case as stateful
* operations cannot be pipelined in the ReferencePipeline implementation.
* Overriding collect-operations overcomes this limitation.
*/
@Override
public <CR, CA> CR collect(Collector<? super R, CA, CR> c) {
linkOrConsume(); // Important for structural integrity
final var parallel = isParallel();
final var u = upstream();
return evaluate(
u.wrapSpliterator(u.sourceSpliterator(0)),
parallel,
gatherer,
c.supplier(),
c.accumulator(),
parallel ? c.combiner() : null,
c.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? null
: c.finisher()
);
}
@Override
public <RR> RR collect(Supplier<RR> supplier,
BiConsumer<RR, ? super R> accumulator,
BiConsumer<RR, RR> combiner) {
linkOrConsume(); // Important for structural integrity
final var parallel = isParallel();
final var u = upstream();
return evaluate(
u.wrapSpliterator(u.sourceSpliterator(0)),
parallel,
gatherer,
supplier,
accumulator,
parallel ? (l, r) -> {
combiner.accept(l, r);
return l;
} : null,
null
);
}
/*
* evaluate(...) is the primary execution mechanism besides opWrapSink()
* and implements both sequential, hybrid parallel-sequential, and
* parallel evaluation
*/
private <CA, CR> CR evaluate(final Spliterator<T> spliterator,
final boolean parallel,
final Gatherer<T, A, R> gatherer,
final Supplier<CA> collectorSupplier,
final BiConsumer<CA, ? super R> collectorAccumulator,
final BinaryOperator<CA> collectorCombiner,
final Function<CA, CR> collectorFinisher) {
// There are two main sections here: sequential and parallel
final var initializer = gatherer.initializer();
final var integrator = gatherer.integrator();
// Optimization
final boolean greedy = integrator instanceof Integrator.Greedy<A, T, R>;
// Sequential evaluation section starts here.
// Sequential is the fusion of a Gatherer and a Collector which can
// be evaluated sequentially.
final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {
A state;
CA collectorState;
boolean proceed;
Sequential() {
if (initializer != Gatherer.defaultInitializer())
state = initializer.get();
collectorState = collectorSupplier.get();
proceed = true;
}
@ForceInline
Sequential evaluateUsing(Spliterator<T> spliterator) {
if (greedy)
spliterator.forEachRemaining(this);
else
do {
} while (proceed && spliterator.tryAdvance(this));
return this;
}
/*
* No need to override isKnownDone() as the default is `false`
* and collectors can never short-circuit.
*/
@Override
public boolean push(R r) {
collectorAccumulator.accept(collectorState, r);
return true;
}
@Override
public void accept(T t) {
/*
* Benchmarking has shown that, in this case, conditional
* writing of `proceed` is desirable and if that was not the
* case, then the following line would've been clearer:
*
* proceed &= integrator.integrate(state, t, this);
*/
var ignore = integrator.integrate(state, t, this)
|| (!greedy && (proceed = false));
}
@SuppressWarnings("unchecked")
public CR get() {
final var finisher = gatherer.finisher();
if (finisher != Gatherer.<A, R>defaultFinisher())
finisher.accept(state, this);
// IF collectorFinisher == null -> IDENTITY_FINISH
return (collectorFinisher == null)
? (CR) collectorState
: collectorFinisher.apply(collectorState);
}
}
/*
* It could be considered to also go to sequential mode if the
* operation is non-greedy AND the combiner is Gatherer.defaultCombiner()
* as those operations will not benefit from upstream parallel
* preprocessing which is the main advantage of the Hybrid evaluation
* strategy.
*/
if (!parallel)
return new Sequential().evaluateUsing(spliterator).get();
// Parallel section starts here:
final var combiner = gatherer.combiner();
/*
* The following implementation of hybrid parallel-sequential
* Gatherer processing borrows heavily from ForeachOrderedTask,
* and adds handling of short-circuiting.
*/
@SuppressWarnings("serial")
final class Hybrid extends CountedCompleter<Sequential> {
private final long targetSize;
private final Hybrid leftPredecessor;
private final AtomicBoolean cancelled;
private final Sequential localResult;
private Spliterator<T> spliterator;
private Hybrid next;
private static final VarHandle NEXT;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
NEXT = l.findVarHandle(Hybrid.class, "next", Hybrid.class);
} catch (Exception e) {
throw new InternalError(e);
}
}
protected Hybrid(Spliterator<T> spliterator) {
super(null);
this.spliterator = spliterator;
this.targetSize =
AbstractTask.suggestTargetSize(spliterator.estimateSize());
this.localResult = new Sequential();
this.cancelled = greedy ? null : new AtomicBoolean(false);
this.leftPredecessor = null;
}
Hybrid(Hybrid parent, Spliterator<T> spliterator, Hybrid leftPredecessor) {
super(parent);
this.spliterator = spliterator;
this.targetSize = parent.targetSize;
this.localResult = parent.localResult;
this.cancelled = parent.cancelled;
this.leftPredecessor = leftPredecessor;
}
@Override
public Sequential getRawResult() {
return localResult;
}
@Override
public void setRawResult(Sequential result) {
if (result != null) throw new IllegalStateException();
}
@Override
public void compute() {
var task = this;
Spliterator<T> rightSplit = task.spliterator, leftSplit;
long sizeThreshold = task.targetSize;
boolean forkRight = false;
while ((greedy || !cancelled.get())
&& rightSplit.estimateSize() > sizeThreshold
&& (leftSplit = rightSplit.trySplit()) != null) {
var leftChild = new Hybrid(task, leftSplit, task.leftPredecessor);
var rightChild = new Hybrid(task, rightSplit, leftChild);
/* leftChild and rightChild were just created and not
* fork():ed yet so no need for a volatile write
*/
leftChild.next = rightChild;
// Fork the parent task
// Completion of the left and right children "happens-before"
// completion of the parent
task.addToPendingCount(1);
// Completion of the left child "happens-before" completion of
// the right child
rightChild.addToPendingCount(1);
// If task is not on the left spine
if (task.leftPredecessor != null) {
/*
* Completion of left-predecessor, or left subtree,
* "happens-before" completion of left-most leaf node of
* right subtree.
* The left child's pending count needs to be updated before
* it is associated in the completion map, otherwise the
* left child can complete prematurely and violate the
* "happens-before" constraint.
*/
leftChild.addToPendingCount(1);
// Update association of left-predecessor to left-most
// leaf node of right subtree
if (NEXT.compareAndSet(task.leftPredecessor, task, leftChild)) {
// If replaced, adjust the pending count of the parent
// to complete when its children complete
task.addToPendingCount(-1);
} else {
// Left-predecessor has already completed, parent's
// pending count is adjusted by left-predecessor;
// left child is ready to complete
leftChild.addToPendingCount(-1);
}
}
if (forkRight) {
rightSplit = leftSplit;
task = leftChild;
rightChild.fork();
} else {
task = rightChild;
leftChild.fork();
}
forkRight = !forkRight;
}
/*
* Task's pending count is either 0 or 1. If 1 then the completion
* map will contain a value that is task, and two calls to
* tryComplete are required for completion, one below and one
* triggered by the completion of task's left-predecessor in
* onCompletion. Therefore there is no data race within the if
* block.
*
* IMPORTANT: Currently we only perform the processing of this
* upstream data if we know the operation is greedy -- as we cannot
* safely speculate on the cost/benefit ratio of parallelizing
* the pre-processing of upstream data under short-circuiting.
*/
if (greedy && task.getPendingCount() > 0) {
// Upstream elements are buffered
NodeBuilder<T> nb = new NodeBuilder<>();
rightSplit.forEachRemaining(nb); // Run the upstream
task.spliterator = nb.build().spliterator();
}
task.tryComplete();
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
var s = spliterator;
spliterator = null; // GC assistance
/* Performance sensitive since each leaf-task could have a
* spliterator of size 1 which means that all else is overhead
* which needs minimization.
*/
if (s != null
&& (greedy || !cancelled.get())
&& !localResult.evaluateUsing(s).proceed
&& !greedy)
cancelled.set(true);
// The completion of this task *and* the dumping of elements
// "happens-before" completion of the associated left-most leaf task
// of right subtree (if any, which can be this task's right sibling)
@SuppressWarnings("unchecked")
var leftDescendant = (Hybrid) NEXT.getAndSet(this, null);
if (leftDescendant != null) {
leftDescendant.tryComplete();
}
}
}
/*
* The following implementation of parallel Gatherer processing
* borrows heavily from AbstractShortCircuitTask
*/
@SuppressWarnings("serial")
final class Parallel extends CountedCompleter<Sequential> {
private Spliterator<T> spliterator;
private Parallel leftChild; // Only non-null if rightChild is
private Parallel rightChild; // Only non-null if leftChild is
private Sequential localResult;
private volatile boolean canceled;
private long targetSize; // lazily initialized
private Parallel(Parallel parent, Spliterator<T> spliterator) {
super(parent);
this.targetSize = parent.targetSize;
this.spliterator = spliterator;
}
Parallel(Spliterator<T> spliterator) {
super(null);
this.targetSize = 0L;
this.spliterator = spliterator;
}
private long getTargetSize(long sizeEstimate) {
long s;
return ((s = targetSize) != 0
? s
: (targetSize = AbstractTask.suggestTargetSize(sizeEstimate)));
}
@Override
public Sequential getRawResult() {
return localResult;
}
@Override
public void setRawResult(Sequential result) {
if (result != null) throw new IllegalStateException();
}
private void doProcess() {
if (!(localResult = new Sequential()).evaluateUsing(spliterator).proceed
&& !greedy)
cancelLaterTasks();
}
@Override
public void compute() {
Spliterator<T> rs = spliterator, ls;
long sizeEstimate = rs.estimateSize();
final long sizeThreshold = getTargetSize(sizeEstimate);
Parallel task = this;
boolean forkRight = false;
boolean proceed;
while ((proceed = (greedy || !task.isRequestedToCancel()))
&& sizeEstimate > sizeThreshold
&& (ls = rs.trySplit()) != null) {
final var leftChild = task.leftChild = new Parallel(task, ls);
final var rightChild = task.rightChild = new Parallel(task, rs);
task.setPendingCount(1);
if (forkRight) {
rs = ls;
task = leftChild;
rightChild.fork();
} else {
task = rightChild;
leftChild.fork();
}
forkRight = !forkRight;
sizeEstimate = rs.estimateSize();
}
if (proceed)
task.doProcess();
task.tryComplete();
}
Sequential merge(Sequential l, Sequential r) {
/*
* Only join the right if the left side didn't short-circuit,
* or when greedy
*/
if (greedy || (l != null && r != null && l.proceed)) {
l.state = combiner.apply(l.state, r.state);
l.collectorState =
collectorCombiner.apply(l.collectorState, r.collectorState);
l.proceed = r.proceed;
return l;
}
return (l != null) ? l : r;
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
spliterator = null; // GC assistance
if (leftChild != null) {
/* Results can only be null in the case where there's
* short-circuiting or when Gatherers are stateful but
* uses `null` as their state value.
*/
localResult = merge(leftChild.localResult, rightChild.localResult);
leftChild = rightChild = null; // GC assistance
}
}
@SuppressWarnings("unchecked")
private Parallel getParent() {
return (Parallel) getCompleter();
}
private boolean isRequestedToCancel() {
boolean cancel = canceled;
if (!cancel) {
for (Parallel parent = getParent();
!cancel && parent != null;
parent = parent.getParent())
cancel = parent.canceled;
}
return cancel;
}
private void cancelLaterTasks() {
// Go up the tree, cancel right siblings of this node and all parents
for (Parallel parent = getParent(), node = this;
parent != null;
node = parent, parent = parent.getParent()) {
// If node is a left child of parent, then has a right sibling
if (parent.leftChild == node)
parent.rightChild.canceled = true;
}
}
}
if (combiner != Gatherer.defaultCombiner())
return new Parallel(spliterator).invoke().get();
else
return new Hybrid(spliterator).invoke().get();
}
}

View file

@ -0,0 +1,707 @@
/*
* Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import jdk.internal.access.SharedSecrets;
import jdk.internal.javac.PreviewFeature;
import jdk.internal.vm.annotation.ForceInline;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Gatherer.Integrator;
import java.util.stream.Gatherer.Downstream;
/**
* Implementations of {@link Gatherer} that provide useful intermediate
* operations, such as windowing functions, folding functions,
* transforming elements concurrently, etc.
*
* @since 22
*/
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
public final class Gatherers {
private Gatherers() { } // This class is not intended to be instantiated
// Public built-in Gatherers and factory methods for them
/**
* Returns a Gatherer that gathers elements into windows
* -- encounter-ordered groups of elements -- of a fixed size.
* If the stream is empty then no window will be produced.
* The last window may contain fewer elements than the supplied window size.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: [[1, 2, 3], [4, 5, 6], [7, 8]]
* List<List<Integer>> windows =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowFixed(3)).toList();
* }
*
* @implSpec Each window produced is an unmodifiable List; calls to any
* mutator method will always cause {@code UnsupportedOperationException}
* to be thrown. There are no guarantees on the implementation type or
* serializability of the produced Lists.
*
* @apiNote For efficiency reasons, windows may be allocated contiguously
* and eagerly. This means that choosing large window sizes for
* small streams may use excessive memory for the duration of
* evaluation of this operation.
*
* @param windowSize the size of the windows
* @param <TR> the type of elements the returned gatherer consumes
* and the contents of the windows it produces
* @return a new gatherer which groups elements into fixed-size windows
* @throws IllegalArgumentException when {@code windowSize} is less than 1
*/
public static <TR> Gatherer<TR, ?, List<TR>> windowFixed(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
class FixedWindow {
Object[] window;
int at;
FixedWindow() {
at = 0;
window = new Object[windowSize];
}
boolean integrate(TR element, Downstream<? super List<TR>> downstream) {
window[at++] = element;
if (at < windowSize) {
return true;
} else {
final var oldWindow = window;
window = new Object[windowSize];
at = 0;
return downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(oldWindow)
);
}
}
void finish(Downstream<? super List<TR>> downstream) {
if (at > 0 && !downstream.isRejecting()) {
var lastWindow = new Object[at];
System.arraycopy(window, 0, lastWindow, 0, at);
window = null;
at = 0;
downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(lastWindow)
);
}
}
}
return Gatherer.<TR, FixedWindow, List<TR>>ofSequential(
// Initializer
FixedWindow::new,
// Integrator
Integrator.<FixedWindow, TR, List<TR>>ofGreedy(FixedWindow::integrate),
// Finisher
FixedWindow::finish
);
}
/**
* Returns a Gatherer that gathers elements into windows --
* encounter-ordered groups of elements -- of a given size, where each
* subsequent window includes all elements of the previous window except
* for the least recent, and adds the next element in the stream.
* If the stream is empty then no window will be produced. If the size of
* the stream is smaller than the window size then only one window will
* be produced, containing all elements in the stream.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
* List<List<Integer>> windows2 =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();
*
* // will contain: [[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
* List<List<Integer>> windows6 =
* Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();
* }
*
* @implSpec Each window produced is an unmodifiable List; calls to any
* mutator method will always cause {@code UnsupportedOperationException}
* to be thrown. There are no guarantees on the implementation type or
* serializability of the produced Lists.
*
* @apiNote For efficiency reasons, windows may be allocated contiguously
* and eagerly. This means that choosing large window sizes for
* small streams may use excessive memory for the duration of
* evaluation of this operation.
*
* @param windowSize the size of the windows
* @param <TR> the type of elements the returned gatherer consumes
* and the contents of the windows it produces
* @return a new gatherer which groups elements into sliding windows
* @throws IllegalArgumentException when windowSize is less than 1
*/
public static <TR> Gatherer<TR, ?, List<TR>> windowSliding(int windowSize) {
if (windowSize < 1)
throw new IllegalArgumentException("'windowSize' must be greater than zero");
class SlidingWindow {
Object[] window;
int at;
boolean firstWindow;
SlidingWindow() {
firstWindow = true;
at = 0;
window = new Object[windowSize];
}
boolean integrate(TR element, Downstream<? super List<TR>> downstream) {
window[at++] = element;
if (at < windowSize) {
return true;
} else {
final var oldWindow = window;
final var newWindow = new Object[windowSize];
System.arraycopy(oldWindow,1, newWindow, 0, windowSize - 1);
window = newWindow;
at -= 1;
firstWindow = false;
return downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(oldWindow)
);
}
}
void finish(Downstream<? super List<TR>> downstream) {
if (firstWindow && at > 0 && !downstream.isRejecting()) {
var lastWindow = new Object[at];
System.arraycopy(window, 0, lastWindow, 0, at);
window = null;
at = 0;
downstream.push(
SharedSecrets.getJavaUtilCollectionAccess()
.listFromTrustedArrayNullsAllowed(lastWindow)
);
}
}
}
return Gatherer.<TR, SlidingWindow, List<TR>>ofSequential(
// Initializer
SlidingWindow::new,
// Integrator
Integrator.<SlidingWindow, TR, List<TR>>ofGreedy(SlidingWindow::integrate),
// Finisher
SlidingWindow::finish
);
}
/**
* Returns a Gatherer that performs an ordered, <i>reduction-like</i>,
* transformation for scenarios where no combiner-function can be
* implemented, or for reductions which are intrinsically
* order-dependent.
*
* @implSpec If no exceptions are thrown during processing, then this
* operation only ever produces a single element.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: Optional["123456789"]
* Optional<String> numberString =
* Stream.of(1,2,3,4,5,6,7,8,9)
* .gather(
* Gatherers.fold(() -> "", (string, number) -> string + number)
* )
* .findFirst();
* }
*
* @see java.util.stream.Stream#reduce(Object, BinaryOperator)
*
* @param initial the identity value for the fold operation
* @param folder the folding function
* @param <T> the type of elements the returned gatherer consumes
* @param <R> the type of elements the returned gatherer produces
* @return a new Gatherer
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T, R> Gatherer<T, ?, R> fold(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> folder) {
Objects.requireNonNull(initial, "'initial' must not be null");
Objects.requireNonNull(folder, "'folder' must not be null");
class State {
R value = initial.get();
State() {}
}
return Gatherer.ofSequential(
State::new,
Integrator.ofGreedy((state, element, downstream) -> {
state.value = folder.apply(state.value, element);
return true;
}),
(state, downstream) -> downstream.push(state.value)
);
}
/**
* Returns a Gatherer that performs a Prefix Scan -- an incremental
* accumulation -- using the provided functions. Starting with an
* initial value obtained from the {@code Supplier}, each subsequent
* value is obtained by applying the {@code BiFunction} to the current
* value and the next input element, after which the resulting value is
* produced downstream.
*
* <p>Example:
* {@snippet lang = java:
* // will contain: ["1", "12", "123", "1234", "12345", "123456", "1234567", "12345678", "123456789"]
* List<String> numberStrings =
* Stream.of(1,2,3,4,5,6,7,8,9)
* .gather(
* Gatherers.scan(() -> "", (string, number) -> string + number)
* )
* .toList();
* }
*
* @param initial the supplier of the initial value for the scanner
* @param scanner the function to apply for each element
* @param <T> the type of element which this gatherer consumes
* @param <R> the type of element which this gatherer produces
* @return a new Gatherer which performs a prefix scan
* @throws NullPointerException if any of the parameters are {@code null}
*/
public static <T, R> Gatherer<T, ?, R> scan(
Supplier<R> initial,
BiFunction<? super R, ? super T, ? extends R> scanner) {
Objects.requireNonNull(initial, "'initial' must not be null");
Objects.requireNonNull(scanner, "'scanner' must not be null");
class State {
R current = initial.get();
boolean integrate(T element, Downstream<? super R> downstream) {
return downstream.push(current = scanner.apply(current, element));
}
}
return Gatherer.ofSequential(State::new,
Integrator.<State,T, R>ofGreedy(State::integrate));
}
/**
* An operation which executes a function concurrently
* with a configured level of max concurrency, using
* <a href="{@docRoot}/java.base/java/lang/Thread.html#virtual-threads">virtual threads</a>.
* This operation preserves the ordering of the stream.
*
* @apiNote In progress tasks will be attempted to be cancelled,
* on a best-effort basis, in situations where the downstream no longer
* wants to receive any more elements.
*
* @implSpec If a result of the function is to be pushed downstream but
* instead the function completed exceptionally then the corresponding
* exception will instead be rethrown by this method as an instance of
* {@link RuntimeException}, after which any remaining tasks are canceled.
*
* @param maxConcurrency the maximum concurrency desired
* @param mapper a function to be executed concurrently
* @param <T> the type of input
* @param <R> the type of output
* @return a new Gatherer
* @throws IllegalArgumentException if {@code maxConcurrency} is less than 1
* @throws NullPointerException if {@code mapper} is {@code null}
*/
public static <T, R> Gatherer<T,?,R> mapConcurrent(
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
if (maxConcurrency < 1)
throw new IllegalArgumentException(
"'maxConcurrency' must be greater than 0");
Objects.requireNonNull(mapper, "'mapper' must not be null");
class State {
// ArrayDeque default initial size is 16
final ArrayDeque<Future<R>> window =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
final Semaphore windowLock = new Semaphore(maxConcurrency);
final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
}
final void createTaskFor(T element) {
windowLock.acquireUninterruptibly();
var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
windowLock.release();
}
});
var wasAddedToWindow = window.add(task);
assert wasAddedToWindow;
Thread.startVirtualThread(task);
}
final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
boolean proceed = !downstream.isRejecting();
boolean interrupted = false;
try {
Future<R> current;
while (proceed
&& (current = window.peek()) != null
&& (current.isDone() || atLeastN > 0)) {
proceed &= downstream.push(current.get());
atLeastN -= 1;
var correctRemoval = window.pop() == current;
assert correctRemoval;
}
} catch(InterruptedException ie) {
proceed = false;
interrupted = true;
} catch (ExecutionException e) {
proceed = false; // Ensure cleanup
final var cause = e.getCause();
throw (cause instanceof RuntimeException re)
? re
: new RuntimeException(cause == null ? e : cause);
} finally {
// Clean up
if (!proceed) {
Future<R> next;
while ((next = window.pollFirst()) != null) {
next.cancel(true);
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
return proceed;
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}
// Implementation details
/*
* This enum is used to provide the default functions for the
* factory methods
* and for the default methods for when implementing the Gatherer interface.
*
* This serves the following purposes:
* 1. removes the need for using `null` for signalling absence of specified
* value and thereby hiding user bugs
* 2. allows to check against these default values to avoid calling methods
* needlessly
* 3. allows for more efficient composition and evaluation
*/
@SuppressWarnings("rawtypes")
enum Value implements Supplier, BinaryOperator, BiConsumer {
DEFAULT;
final BinaryOperator<Void> statelessCombiner = new BinaryOperator<>() {
@Override public Void apply(Void left, Void right) { return null; }
};
// BiConsumer
@Override public void accept(Object state, Object downstream) {}
// BinaryOperator
@Override public Object apply(Object left, Object right) {
throw new UnsupportedOperationException("This combiner cannot be used!");
}
// Supplier
@Override public Object get() { return null; }
@ForceInline
@SuppressWarnings("unchecked")
<A> Supplier<A> initializer() { return (Supplier<A>)this; }
@ForceInline
@SuppressWarnings("unchecked")
<T> BinaryOperator<T> combiner() { return (BinaryOperator<T>) this; }
@ForceInline
@SuppressWarnings("unchecked")
<T, R> BiConsumer<T, Gatherer.Downstream<? super R>> finisher() {
return (BiConsumer<T, Downstream<? super R>>) this;
}
}
record GathererImpl<T, A, R>(
@Override Supplier<A> initializer,
@Override Integrator<A, T, R> integrator,
@Override BinaryOperator<A> combiner,
@Override BiConsumer<A, Downstream<? super R>> finisher) implements Gatherer<T, A, R> {
static <T, A, R> GathererImpl<T, A, R> of(
Supplier<A> initializer,
Integrator<A, T, R> integrator,
BinaryOperator<A> combiner,
BiConsumer<A, Downstream<? super R>> finisher) {
return new GathererImpl<>(
Objects.requireNonNull(initializer,"initializer"),
Objects.requireNonNull(integrator, "integrator"),
Objects.requireNonNull(combiner, "combiner"),
Objects.requireNonNull(finisher, "finisher")
);
}
}
static final class Composite<T, A, R, AA, RR> implements Gatherer<T, Object, RR> {
private final Gatherer<T, A, ? extends R> left;
private final Gatherer<? super R, AA, ? extends RR> right;
// FIXME change `impl` to a computed constant when available
private GathererImpl<T, Object, RR> impl;
static <T, A, R, AA, RR> Composite<T, A, R, AA, RR> of(
Gatherer<T, A, ? extends R> left,
Gatherer<? super R, AA, ? extends RR> right) {
return new Composite<>(left, right);
}
private Composite(Gatherer<T, A, ? extends R> left,
Gatherer<? super R, AA, ? extends RR> right) {
this.left = left;
this.right = right;
}
@SuppressWarnings("unchecked")
private GathererImpl<T, Object, RR> impl() {
// ATTENTION: this method currently relies on a "benign" data-race
// as it should deterministically produce the same result even if
// initialized concurrently on different threads.
var i = impl;
return i != null
? i
: (impl = (GathererImpl<T, Object, RR>)impl(left, right));
}
@Override public Supplier<Object> initializer() {
return impl().initializer();
}
@Override public Integrator<Object, T, RR> integrator() {
return impl().integrator();
}
@Override public BinaryOperator<Object> combiner() {
return impl().combiner();
}
@Override public BiConsumer<Object, Downstream<? super RR>> finisher() {
return impl().finisher();
}
@Override
public <RRR> Gatherer<T, ?, RRR> andThen(
Gatherer<? super RR, ?, ? extends RRR> that) {
if (that.getClass() == Composite.class) {
@SuppressWarnings("unchecked")
final var c =
(Composite<? super RR, ?, Object, ?, ? extends RRR>) that;
return left.andThen(right.andThen(c.left).andThen(c.right));
} else {
return left.andThen(right.andThen(that));
}
}
static final <T, A, R, AA, RR> GathererImpl<T, ?, RR> impl(
Gatherer<T, A, R> left, Gatherer<? super R, AA, RR> right) {
final var leftInitializer = left.initializer();
final var leftIntegrator = left.integrator();
final var leftCombiner = left.combiner();
final var leftFinisher = left.finisher();
final var rightInitializer = right.initializer();
final var rightIntegrator = right.integrator();
final var rightCombiner = right.combiner();
final var rightFinisher = right.finisher();
final var leftStateless = leftInitializer == Gatherer.defaultInitializer();
final var rightStateless = rightInitializer == Gatherer.defaultInitializer();
final var leftGreedy = leftIntegrator instanceof Integrator.Greedy;
final var rightGreedy = rightIntegrator instanceof Integrator.Greedy;
/*
* For pairs of stateless and greedy Gatherers, we can optimize
* evaluation as we do not need to track any state nor any
* short-circuit signals. This can provide significant
* performance improvements.
*/
if (leftStateless && rightStateless && leftGreedy && rightGreedy) {
return new GathererImpl<>(
Gatherer.defaultInitializer(),
Gatherer.Integrator.ofGreedy((unused, element, downstream) ->
leftIntegrator.integrate(
null,
element,
r -> rightIntegrator.integrate(null, r, downstream))
),
(leftCombiner == Gatherer.defaultCombiner()
|| rightCombiner == Gatherer.defaultCombiner())
? Gatherer.defaultCombiner()
: Value.DEFAULT.statelessCombiner
,
(leftFinisher == Gatherer.<A,R>defaultFinisher()
&& rightFinisher == Gatherer.<AA,RR>defaultFinisher())
? Gatherer.defaultFinisher()
: (unused, downstream) -> {
if (leftFinisher != Gatherer.<A,R>defaultFinisher())
leftFinisher.accept(
null,
r -> rightIntegrator.integrate(null, r, downstream));
if (rightFinisher != Gatherer.<AA,RR>defaultFinisher())
rightFinisher.accept(null, downstream);
}
);
} else {
class State {
final A leftState;
final AA rightState;
boolean leftProceed;
boolean rightProceed;
private State(A leftState, AA rightState,
boolean leftProceed, boolean rightProceed) {
this.leftState = leftState;
this.rightState = rightState;
this.leftProceed = leftProceed;
this.rightProceed = rightProceed;
}
State() {
this(leftStateless ? null : leftInitializer.get(),
rightStateless ? null : rightInitializer.get(),
true, true);
}
State joinLeft(State right) {
return new State(
leftStateless ? null : leftCombiner.apply(this.leftState, right.leftState),
rightStateless ? null : rightCombiner.apply(this.rightState, right.rightState),
this.leftProceed && this.rightProceed,
right.leftProceed && right.rightProceed);
}
boolean integrate(T t, Downstream<? super RR> c) {
/*
* rightProceed must be checked after integration of
* left since that can cause right to short-circuit
* We always want to conditionally write leftProceed
* here, which means that we only do so if we are
* known to be not-greedy.
*/
return (leftIntegrator.integrate(leftState, t, r -> rightIntegrate(r, c))
|| leftGreedy
|| (leftProceed = false))
&& (rightGreedy || rightProceed);
}
void finish(Downstream<? super RR> c) {
if (leftFinisher != Gatherer.<A, R>defaultFinisher())
leftFinisher.accept(leftState, r -> rightIntegrate(r, c));
if (rightFinisher != Gatherer.<AA, RR>defaultFinisher())
rightFinisher.accept(rightState, c);
}
/*
* Currently we use the following to ferry elements from
* the left Gatherer to the right Gatherer, but we create
* the Gatherer.Downstream as a lambda which means that
* the default implementation of `isKnownDone()` is used.
*
* If it is determined that we want to be able to support
* the full interface of Gatherer.Downstream then we have
* the following options:
* 1. Have State implement Downstream<? super R>
* and store the passed in Downstream<? super RR>
* downstream as an instance field in integrate()
* and read it in push(R r).
* 2. Allocate a new Gatherer.Downstream<? super R> for
* each invocation of integrate() which might prove
* costly.
*/
public boolean rightIntegrate(R r, Downstream<? super RR> downstream) {
// The following logic is highly performance sensitive
return (rightGreedy || rightProceed)
&& (rightIntegrator.integrate(rightState, r, downstream)
|| rightGreedy
|| (rightProceed = false));
}
}
return new GathererImpl<T, State, RR>(
State::new,
(leftGreedy && rightGreedy)
? Integrator.<State, T, RR>ofGreedy(State::integrate)
: Integrator.<State, T, RR>of(State::integrate),
(leftCombiner == Gatherer.defaultCombiner()
|| rightCombiner == Gatherer.defaultCombiner())
? Gatherer.defaultCombiner()
: State::joinLeft,
(leftFinisher == Gatherer.<A, R>defaultFinisher()
&& rightFinisher == Gatherer.<AA, RR>defaultFinisher())
? Gatherer.defaultFinisher()
: State::finish
);
}
}
}
}

View file

@ -90,12 +90,27 @@ abstract class ReferencePipeline<P_IN, P_OUT>
* Constructor for appending an intermediate operation onto an existing
* pipeline.
*
* @param upstream the upstream element source.
* @param upstream the upstream element source
* @param opFlags The operation flags for this operation, described in
* {@link StreamOpFlag}
*/
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upstream, opFlags);
}
/**
* Constructor for appending an intermediate operation onto an existing
* pipeline.
*
* @param upupstream the upstream of the upstream element source
* @param upstream the upstream element source
* @param opFlags The operation flags for this operation, described in
* {@link StreamOpFlag}
*/
protected ReferencePipeline(AbstractPipeline<?, P_IN, ?> upupstream, AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
super(upupstream, upstream, opFlags);
}
// Shape-specific methods
@Override
@ -667,9 +682,14 @@ abstract class ReferencePipeline<P_IN, P_OUT>
return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
@Override
public final <R> Stream<R> gather(Gatherer<? super P_OUT, ?, R> gatherer) {
return GathererOp.of(this, gatherer);
}
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
public <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
@ -687,7 +707,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
}
@Override
public final <R> R collect(Supplier<R> supplier,
public <R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super P_OUT> accumulator,
BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));

View file

@ -24,6 +24,8 @@
*/
package java.util.stream;
import jdk.internal.javac.PreviewFeature;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
@ -1051,6 +1053,58 @@ public interface Stream<T> extends BaseStream<T, Stream<T>> {
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
/**
* Returns a stream consisting of the results of applying the given
* {@link Gatherer} to the elements of this stream.
*
* <p>This is a <a href="package-summary.html#StreamOps">stateful
* intermediate operation</a> that is an
* <a href="package-summary.html#Extensibility">extension point</a>.
*
* <p>Gatherers are highly flexible and can describe a vast array of
* possibly stateful operations, with support for short-circuiting, and
* parallelization.
*
* <p>When executed in parallel, multiple intermediate results may be
* instantiated, populated, and merged so as to maintain isolation of
* mutable data structures. Therefore, even when executed in parallel
* with non-thread-safe data structures (such as {@code ArrayList}), no
* additional synchronization is needed for a parallel reduction.
*
* <p>Implementations are allowed, but not required, to detect consecutive
* invocations and compose them into a single, fused, operation. This would
* make the first expression below behave like the second:
*
* <pre>{@code
* var stream1 = Stream.of(...).gather(gatherer1).gather(gatherer2);
* var stream2 = Stream.of(...).gather(gatherer1.andThen(gatherer2));
* }</pre>
*
* @implSpec
* The default implementation obtains the {@link #spliterator() spliterator}
* of this stream, wraps that spliterator so as to support the semantics
* of this operation on traversal, and returns a new stream associated with
* the wrapped spliterator. The returned stream preserves the execution
* characteristics of this stream (namely parallel or sequential execution
* as per {@link #isParallel()}) but the wrapped spliterator may choose to
* not support splitting. When the returned stream is closed, the close
* handlers for both the returned and this stream are invoked.
* Implementations of this interface should provide their own
* implementation of this method.
*
* @see Gatherers
* @param <R> The element type of the new stream
* @param gatherer a gatherer
* @return the new stream
* @since 22
*/
@PreviewFeature(feature = PreviewFeature.Feature.STREAM_GATHERERS)
default <R> Stream<R> gather(Gatherer<? super T, ?, R> gatherer) {
return StreamSupport.stream(spliterator(), isParallel())
.gather(gatherer)
.onClose(this::close);
}
/**
* Performs a <a href="package-summary.html#MutableReduction">mutable
* reduction</a> operation on the elements of this stream. A mutable

View file

@ -620,6 +620,19 @@
* but in some cases equivalence may be relaxed to account for differences in
* order.
*
* <h3><a id="Extensibility">Extensibility</a></h3>
*
* <p>Implementing {@link java.util.stream.Collector};
* using the factory method {@code java.util.stream.Collector.of(...)}; or
* using the predefined collectors in {@link java.util.stream.Collectors} allows
* for user-defined, reusable, <em>terminal</em> operations.
*
* <p>Implementing {@link java.util.stream.Gatherer}; using the factory
* methods {@code java.util.stream.Gatherer.of(...)} and
* {@code java.util.stream.Gatherer.ofSequential(...)};
* or using the predefined gatherers in {@link java.util.stream.Gatherers}
* allows for user-defined, reusable, <em>intermediate</em> operations.
*
* <h3><a id="ConcurrentReduction">Reduction, concurrency, and ordering</a></h3>
*
* With some complex reduction operations, for example a {@code collect()} that