8238286: Add new flatMap stream operation that is more amenable to pushing

This patch adds a new flatmap-like operation called mapMulti to the java.util.Stream class as well as the primitive variations of this operation i.e. mapMultiToInt, IntStream mapMulti, etc.

Reviewed-by: psandoz, smarks
This commit is contained in:
Patrick Concannon 2020-08-31 16:12:32 +01:00
parent dd89c92c50
commit 79d12507b3
10 changed files with 899 additions and 15 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -257,7 +257,7 @@ abstract class ReferencePipeline<P_IN, P_OUT>
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
return new Sink.ChainedReference<>(sink) {
// true if cancellationRequested() has been called
boolean cancellationRequestedCalled;
@ -428,6 +428,103 @@ abstract class ReferencePipeline<P_IN, P_OUT>
};
}
@Override
public final <R> Stream<R> mapMulti(BiConsumer<? super P_OUT, ? super Consumer<R>> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) {
mapper.accept(u, (Consumer<R>) downstream);
}
};
}
};
}
@Override
public final IntStream mapMultiToInt(BiConsumer<? super P_OUT, ? super IntConsumer> mapper) {
Objects.requireNonNull(mapper);
return new IntPipeline.StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) {
mapper.accept(u, (IntConsumer)downstream);
}
};
}
};
}
@Override
public final LongStream mapMultiToLong(BiConsumer<? super P_OUT, ? super LongConsumer> mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) {
mapper.accept(u, (LongConsumer) downstream);
}
};
}
};
}
@Override
public final DoubleStream mapMultiToDouble(BiConsumer<? super P_OUT, ? super DoubleConsumer> mapper) {
Objects.requireNonNull(mapper);
return new DoublePipeline.StatelessOp<>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedReference<>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
@SuppressWarnings("unchecked")
public void accept(P_OUT u) {
mapper.accept(u, (DoubleConsumer) downstream);
}
};
}
};
}
@Override
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
Objects.requireNonNull(action);