8276779: (ch) InputStream returned by Channels.newInputStream should have fast path for SelectableChannels

Reviewed-by: lancea, alanb
This commit is contained in:
Markus Karg 2021-12-04 09:27:23 +00:00 committed by Alan Bateman
parent 02ee337ae0
commit 9642629d15
3 changed files with 110 additions and 11 deletions

View file

@ -69,9 +69,12 @@ public final class Channels {
/** /**
* Constructs a stream that reads bytes from the given channel. * Constructs a stream that reads bytes from the given channel.
* *
* <p> The {@code read} methods of the resulting stream will throw an * <p> The {@code read} and {@code transferTo} methods of the resulting stream
* {@link IllegalBlockingModeException} if invoked while the underlying * will throw an {@link IllegalBlockingModeException} if invoked while the
* channel is in non-blocking mode. The stream will not be buffered, and * underlying channel is in non-blocking mode. The {@code transferTo} method
* will also throw an {@code IllegalBlockingModeException} if invoked to
* transfer bytes to an output stream that writes to an underlying channel in
* non-blocking mode. The stream will not be buffered, and
* it will not support the {@link InputStream#mark mark} or {@link * it will not support the {@link InputStream#mark mark} or {@link
* InputStream#reset reset} methods. The stream will be safe for access by * InputStream#reset reset} methods. The stream will be safe for access by
* multiple concurrent threads. Closing the stream will in turn cause the * multiple concurrent threads. Closing the stream will in turn cause the

View file

@ -34,6 +34,7 @@ import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel; import java.nio.channels.SeekableByteChannel;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays; import java.util.Arrays;
import java.util.Objects; import java.util.Objects;
import jdk.internal.util.ArraysSupport; import jdk.internal.util.ArraysSupport;
@ -238,15 +239,28 @@ public class ChannelInputStream
Objects.requireNonNull(out, "out"); Objects.requireNonNull(out, "out");
if (out instanceof ChannelOutputStream cos if (out instanceof ChannelOutputStream cos
&& ch instanceof FileChannel fc && ch instanceof FileChannel fc) {
&& cos.channel() instanceof FileChannel dst) { WritableByteChannel wbc = cos.channel();
return transfer(fc, dst);
if (wbc instanceof FileChannel dst) {
return transfer(fc, dst);
}
if (wbc instanceof SelectableChannel sc) {
synchronized (sc.blockingLock()) {
if (!sc.isBlocking())
throw new IllegalBlockingModeException();
return transfer(fc, wbc);
}
}
return transfer(fc, wbc);
} }
return super.transferTo(out); return super.transferTo(out);
} }
private static long transfer(FileChannel src, FileChannel dst) throws IOException { private static long transfer(FileChannel src, WritableByteChannel dst) throws IOException {
long initialPos = src.position(); long initialPos = src.position();
long pos = initialPos; long pos = initialPos;
try { try {

View file

@ -28,11 +28,19 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.Arrays; import java.util.Arrays;
import java.util.Random; import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -70,6 +78,8 @@ public class TransferTo {
private static final Random RND = RandomFactory.getRandom(); private static final Random RND = RandomFactory.getRandom();
private static final Path CWD = Path.of(".");
/* /*
* Provides test scenarios, i. e. combinations of input and output streams to be tested. * Provides test scenarios, i. e. combinations of input and output streams to be tested.
*/ */
@ -79,6 +89,12 @@ public class TransferTo {
// tests FileChannel.transferTo(FileChannel) optimized case // tests FileChannel.transferTo(FileChannel) optimized case
{ fileChannelInput(), fileChannelOutput() }, { fileChannelInput(), fileChannelOutput() },
// tests FileChannel.transferTo(SelectableChannelOutput) optimized case
{ fileChannelInput(), selectableChannelOutput() },
// tests FileChannel.transferTo(WritableChannelOutput) optimized case
{ fileChannelInput(), writableByteChannelOutput() },
// tests InputStream.transferTo(OutputStream) default case // tests InputStream.transferTo(OutputStream) default case
{ readableByteChannelInput(), defaultOutput() } { readableByteChannelInput(), defaultOutput() }
}; };
@ -138,10 +154,10 @@ public class TransferTo {
*/ */
@Test @Test
public void testMoreThanTwoGB() throws IOException { public void testMoreThanTwoGB() throws IOException {
Path sourceFile = Files.createTempFile("test2GBSource", null); Path sourceFile = Files.createTempFile(CWD, "test2GBSource", null);
try { try {
// preparing two temporary files which will be compared at the end of the test // preparing two temporary files which will be compared at the end of the test
Path targetFile = Files.createTempFile("test2GBtarget", null); Path targetFile = Files.createTempFile(CWD, "test2GBtarget", null);
try { try {
// writing 3 GB of random bytes into source file // writing 3 GB of random bytes into source file
for (int i = 0; i < NUM_WRITES; i++) for (int i = 0; i < NUM_WRITES; i++)
@ -169,6 +185,37 @@ public class TransferTo {
} }
} }
/*
* Special test whether selectable channel based transfer throws blocking mode exception.
*/
@Test
public void testIllegalBlockingMode() throws IOException {
Pipe pipe = Pipe.open();
try {
// testing arbitrary input (here: empty file) to non-blocking selectable output
try (FileChannel fc = FileChannel.open(Files.createTempFile(CWD, "testIllegalBlockingMode", null));
InputStream is = Channels.newInputStream(fc);
SelectableChannel sc = pipe.sink().configureBlocking(false);
OutputStream os = Channels.newOutputStream((WritableByteChannel) sc)) {
// IllegalBlockingMode must be thrown when trying to perform a transfer
assertThrows(IllegalBlockingModeException.class, () -> is.transferTo(os));
}
// testing non-blocking selectable input to arbitrary output (here: byte array)
try (SelectableChannel sc = pipe.source().configureBlocking(false);
InputStream is = Channels.newInputStream((ReadableByteChannel) sc);
OutputStream os = new ByteArrayOutputStream()) {
// IllegalBlockingMode must be thrown when trying to perform a transfer
assertThrows(IllegalBlockingModeException.class, () -> is.transferTo(os));
}
} finally {
pipe.source().close();
pipe.sink().close();
}
}
/* /*
* Asserts that the transferred content is correct, i. e. compares the actually transferred bytes * Asserts that the transferred content is correct, i. e. compares the actually transferred bytes
* to the expected assumption. The position of the input and output stream before the transfer is * to the expected assumption. The position of the input and output stream before the transfer is
@ -242,7 +289,7 @@ public class TransferTo {
return new InputStreamProvider() { return new InputStreamProvider() {
@Override @Override
public InputStream input(byte... bytes) throws Exception { public InputStream input(byte... bytes) throws Exception {
Path path = Files.createTempFile(null, null); Path path = Files.createTempFile(CWD, "fileChannelInput", null);
Files.write(path, bytes); Files.write(path, bytes);
FileChannel fileChannel = FileChannel.open(path); FileChannel fileChannel = FileChannel.open(path);
return Channels.newInputStream(fileChannel); return Channels.newInputStream(fileChannel);
@ -268,7 +315,7 @@ public class TransferTo {
private static OutputStreamProvider fileChannelOutput() { private static OutputStreamProvider fileChannelOutput() {
return new OutputStreamProvider() { return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception { public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
Path path = Files.createTempFile(null, null); Path path = Files.createTempFile(CWD, "fileChannelOutput", null);
FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE); FileChannel fileChannel = FileChannel.open(path, StandardOpenOption.WRITE);
spy.accept(() -> { spy.accept(() -> {
try { try {
@ -282,4 +329,39 @@ public class TransferTo {
}; };
} }
private static OutputStreamProvider selectableChannelOutput() throws IOException {
return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
Pipe pipe = Pipe.open();
Future<byte[]> bytes = CompletableFuture.supplyAsync(() -> {
try {
InputStream is = Channels.newInputStream(pipe.source());
return is.readAllBytes();
} catch (IOException e) {
throw new AssertionError("Exception while asserting content", e);
}
});
final OutputStream os = Channels.newOutputStream(pipe.sink());
spy.accept(() -> {
try {
os.close();
return bytes.get();
} catch (IOException | InterruptedException | ExecutionException e) {
throw new AssertionError("Exception while asserting content", e);
}
});
return os;
}
};
}
private static OutputStreamProvider writableByteChannelOutput() {
return new OutputStreamProvider() {
public OutputStream output(Consumer<Supplier<byte[]>> spy) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
spy.accept(outputStream::toByteArray);
return Channels.newOutputStream(Channels.newChannel(outputStream));
}
};
}
} }