8310902: (fc) FileChannel.transferXXX async close and interrupt issues

Reviewed-by: bpb
This commit is contained in:
Alan Bateman 2023-06-29 05:42:19 +00:00
parent cf8d706300
commit f4b900b607
3 changed files with 772 additions and 201 deletions

View file

@ -35,6 +35,7 @@ import java.lang.ref.Cleaner.Cleanable;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.Channel;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
@ -214,6 +215,7 @@ public class FileChannelImpl
}
}
@Override
public int read(ByteBuffer dst) throws IOException {
ensureOpen();
if (!readable)
@ -245,6 +247,7 @@ public class FileChannelImpl
}
}
@Override
public long read(ByteBuffer[] dsts, int offset, int length)
throws IOException
{
@ -280,6 +283,7 @@ public class FileChannelImpl
}
}
@Override
public int write(ByteBuffer src) throws IOException {
ensureOpen();
if (!writable)
@ -312,6 +316,7 @@ public class FileChannelImpl
}
}
@Override
public long write(ByteBuffer[] srcs, int offset, int length)
throws IOException
{
@ -348,6 +353,7 @@ public class FileChannelImpl
// -- Other operations --
@Override
public long position() throws IOException {
ensureOpen();
synchronized (positionLock) {
@ -377,6 +383,7 @@ public class FileChannelImpl
}
}
@Override
public FileChannel position(long newPosition) throws IOException {
ensureOpen();
if (newPosition < 0)
@ -406,6 +413,7 @@ public class FileChannelImpl
}
}
@Override
public long size() throws IOException {
ensureOpen();
synchronized (positionLock) {
@ -433,6 +441,7 @@ public class FileChannelImpl
}
}
@Override
public FileChannel truncate(long newSize) throws IOException {
ensureOpen();
if (newSize < 0)
@ -510,6 +519,7 @@ public class FileChannelImpl
}
}
@Override
public void force(boolean metaData) throws IOException {
ensureOpen();
int rv = -1;
@ -534,110 +544,203 @@ public class FileChannelImpl
}
}
// Assume at first that the underlying kernel supports sendfile();
// Assume at first that the underlying kernel supports sendfile/equivalent;
// set this to true if we find out later that it doesn't
//
private static volatile boolean transferToNotSupported;
private static volatile boolean transferToDirectNotSupported;
// Assume that the underlying kernel sendfile() will work if the target
// fd is a pipe; set this to false if we find out later that it doesn't
// Assume at first that the underlying kernel supports copy_file_range/equivalent;
// set this to true if we find out later that it doesn't
//
private static volatile boolean pipeSupported = true;
private static volatile boolean transferFromDirectNotSupported;
// Assume that the underlying kernel sendfile() will work if the target
// fd is a file; set this to false if we find out later that it doesn't
//
private static volatile boolean fileSupported = true;
private long transferToDirectlyInternal(long position, int icount,
WritableByteChannel target,
FileDescriptor targetFD)
throws IOException
{
assert !nd.transferToDirectlyNeedsPositionLock() ||
Thread.holdsLock(positionLock);
long n = -1;
int ti = -1;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return -1;
boolean append = fdAccess.getAppend(targetFD);
do {
long comp = Blocker.begin();
try {
n = nd.transferTo(fd, position, icount, targetFD, append);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
pipeSupported = false;
if (target instanceof FileChannelImpl)
fileSupported = false;
return IOStatus.UNSUPPORTED_CASE;
}
if (n == IOStatus.UNSUPPORTED) {
// Don't bother trying again
transferToNotSupported = true;
return IOStatus.UNSUPPORTED;
}
return IOStatus.normalize(n);
} finally {
/**
* Marks the beginning of a transfer to or from this channel.
* @throws ClosedChannelException if channel is closed
*/
private int beforeTransfer() throws ClosedChannelException {
int ti = threads.add();
if (isOpen()) {
return ti;
} else {
threads.remove(ti);
end (n > -1);
throw new ClosedChannelException();
}
}
private long transferToDirectly(long position, int icount,
WritableByteChannel target)
/**
* Marks the end of a transfer to or from this channel.
* @throws AsynchronousCloseException if not completed and the channel is closed
*/
private void afterTransfer(boolean completed, int ti) throws AsynchronousCloseException {
threads.remove(ti);
if (!completed && !isOpen()) {
throw new AsynchronousCloseException();
}
}
/**
* Invoked when ClosedChannelException is thrown during a transfer. This method
* translates it to an AsynchronousCloseException or ClosedByInterruptException. In
* the case of ClosedByInterruptException, it ensures that this channel and the
* source/target channel are closed.
*/
private AsynchronousCloseException transferFailed(ClosedChannelException cce, Channel other) {
ClosedByInterruptException cbie = null;
if (cce instanceof ClosedByInterruptException e) {
assert Thread.currentThread().isInterrupted();
cbie = e;
} else if (!uninterruptible && Thread.currentThread().isInterrupted()) {
cbie = new ClosedByInterruptException();
}
if (cbie != null) {
try {
this.close();
} catch (IOException ioe) {
cbie.addSuppressed(ioe);
}
try {
other.close();
} catch (IOException ioe) {
cbie.addSuppressed(ioe);
}
return cbie;
}
// one of the channels was closed during the transfer
if (cce instanceof AsynchronousCloseException ace) {
return ace;
} else {
var ace = new AsynchronousCloseException();
ace.addSuppressed(cce);
return ace;
}
}
/**
* Transfers bytes from this channel's file to the resource that the given file
* descriptor is connected to.
*/
private long transferToFileDescriptor(long position, int count, FileDescriptor targetFD) {
long n;
boolean append = fdAccess.getAppend(targetFD);
do {
long comp = Blocker.begin();
try {
n = nd.transferTo(fd, position, count, targetFD, append);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return n;
}
/**
* Transfers bytes from this channel's file to the given channel's file.
*/
private long transferToFileChannel(long position, int count, FileChannelImpl target)
throws IOException
{
if (transferToNotSupported)
return IOStatus.UNSUPPORTED;
FileDescriptor targetFD = null;
if (target instanceof FileChannelImpl) {
if (!fileSupported)
return IOStatus.UNSUPPORTED_CASE;
targetFD = ((FileChannelImpl)target).fd;
} else if (target instanceof SelChImpl) {
// Direct transfer to pipe causes EINVAL on some configurations
if ((target instanceof SinkChannelImpl) && !pipeSupported)
return IOStatus.UNSUPPORTED_CASE;
// Platform-specific restrictions. Now there is only one:
// Direct transfer to non-blocking channel could be forbidden
SelectableChannel sc = (SelectableChannel)target;
if (!nd.canTransferToDirectly(sc))
return IOStatus.UNSUPPORTED_CASE;
targetFD = ((SelChImpl)target).getFD();
final FileChannelImpl source = this;
boolean completed = false;
try {
beginBlocking();
int sourceIndex = source.beforeTransfer();
try {
int targetIndex = target.beforeTransfer();
try {
long n = transferToFileDescriptor(position, count, target.fd);
completed = (n >= 0);
return IOStatus.normalize(n);
} finally {
target.afterTransfer(completed, targetIndex);
}
} finally {
source.afterTransfer(completed, sourceIndex);
}
} finally {
endBlocking(completed);
}
}
if (targetFD == null)
return IOStatus.UNSUPPORTED;
int thisFDVal = IOUtil.fdVal(fd);
int targetFDVal = IOUtil.fdVal(targetFD);
if (thisFDVal == targetFDVal) // Not supported on some configurations
return IOStatus.UNSUPPORTED;
/**
* Transfers bytes from this channel's file to the given channel's socket.
*/
private long transferToSocketChannel(long position, int count, SocketChannelImpl target)
throws IOException
{
final FileChannelImpl source = this;
boolean completed = false;
try {
beginBlocking();
int sourceIndex = source.beforeTransfer();
try {
target.beforeTransferTo();
try {
long n = transferToFileDescriptor(position, count, target.getFD());
completed = (n >= 0);
return IOStatus.normalize(n);
} finally {
target.afterTransferTo(completed);
}
} finally {
source.afterTransfer(completed, sourceIndex);
}
} finally {
endBlocking(completed);
}
}
/**
* Transfers bytes from this channel's file from the given channel's file. This
* implementation uses sendfile, copy_file_range or equivalent.
*/
private long transferToDirectInternal(long position, int count, WritableByteChannel target)
throws IOException
{
assert !nd.transferToDirectlyNeedsPositionLock() || Thread.holdsLock(positionLock);
return switch (target) {
case FileChannelImpl fci -> transferToFileChannel(position, count, fci);
case SocketChannelImpl sci -> transferToSocketChannel(position, count, sci);
default -> IOStatus.UNSUPPORTED_CASE;
};
}
/**
* Transfers bytes from this channel's file to the given channel's file or socket.
* @return the number of bytes transferred, UNSUPPORTED_CASE if this transfer cannot
* be done directly, or UNSUPPORTED if there is no direct support
*/
private long transferToDirect(long position, int count, WritableByteChannel target)
throws IOException
{
if (transferToDirectNotSupported)
return IOStatus.UNSUPPORTED;
if (target instanceof SelectableChannel sc && !nd.canTransferToDirectly(sc))
return IOStatus.UNSUPPORTED_CASE;
long n;
if (nd.transferToDirectlyNeedsPositionLock()) {
synchronized (positionLock) {
long pos = position();
try {
return transferToDirectlyInternal(position, icount,
target, targetFD);
n = transferToDirectInternal(position, count, target);
} finally {
position(pos);
try {
position(pos);
} catch (ClosedChannelException ignore) {
// can't reset position if channel is closed
}
}
}
} else {
return transferToDirectlyInternal(position, icount, target, targetFD);
n = transferToDirectInternal(position, count, target);
}
if (n == IOStatus.UNSUPPORTED) {
transferToDirectNotSupported = true;
}
return n;
}
// Size threshold above which to use a mapped buffer;
@ -648,6 +751,10 @@ public class FileChannelImpl
// Maximum size to map when using a mapped buffer
private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;
/**
* Transfers bytes from channel's file to the given channel. This implementation
* memory maps this channel's file.
*/
private long transferToTrustedChannel(long position, long count,
WritableByteChannel target)
throws IOException
@ -657,25 +764,24 @@ public class FileChannelImpl
boolean isSelChImpl = (target instanceof SelChImpl);
if (!((target instanceof FileChannelImpl) || isSelChImpl))
return IOStatus.UNSUPPORTED;
return IOStatus.UNSUPPORTED_CASE;
if (target == this) {
long posThis = position();
if (posThis - count + 1 <= position &&
position - count + 1 <= posThis &&
!nd.canTransferToFromOverlappedMap()) {
if ((posThis - count + 1 <= position)
&& (position - count + 1 <= posThis)
&& !nd.canTransferToFromOverlappedMap()) {
return IOStatus.UNSUPPORTED_CASE;
}
}
// Trusted target: Use a mapped buffer
long remaining = count;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
try {
MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
try {
// ## Bug: Closing this channel will not terminate the write
// write may block, closing this channel will not wake it up
int n = target.write(dbb);
assert n >= 0;
remaining -= n;
@ -688,16 +794,6 @@ public class FileChannelImpl
} finally {
unmap(dbb);
}
} catch (ClosedByInterruptException e) {
// target closed by interrupt as ClosedByInterruptException needs
// to be thrown after closing this channel.
assert !target.isOpen();
try {
close();
} catch (Throwable suppressed) {
e.addSuppressed(suppressed);
}
throw e;
} catch (IOException ioe) {
// Only throw exception if no bytes have been written
if (remaining == count)
@ -708,24 +804,26 @@ public class FileChannelImpl
return count - remaining;
}
/**
* Transfers bytes from channel's file to the given channel.
*/
private long transferToArbitraryChannel(long position, long count,
WritableByteChannel target)
throws IOException
{
// Untrusted target: Use a newly-erased buffer
int c = (int)Math.min(count, TRANSFER_SIZE);
int c = (int) Math.min(count, TRANSFER_SIZE);
ByteBuffer bb = ByteBuffer.allocate(c);
long tw = 0; // Total bytes written
long pos = position;
try {
while (tw < count) {
bb.limit((int)Math.min(count - tw, TRANSFER_SIZE));
bb.limit((int) Math.min(count - tw, TRANSFER_SIZE));
int nr = read(bb, pos);
if (nr <= 0)
break;
bb.flip();
// ## Bug: Will block writing target if this channel
// ## is asynchronously closed
// write may block, closing this channel will not wake it up
int nw = target.write(bb);
tw += nw;
if (nw != nr)
@ -741,8 +839,8 @@ public class FileChannelImpl
}
}
public long transferTo(long position, long count,
WritableByteChannel target)
@Override
public long transferTo(long position, long count, WritableByteChannel target)
throws IOException
{
ensureOpen();
@ -750,101 +848,109 @@ public class FileChannelImpl
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
if (target instanceof FileChannelImpl && !((FileChannelImpl) target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
final long sz = size();
if (position > sz)
return 0;
// Now position <= sz so remaining >= 0 and
// remaining == 0 if and only if sz == 0
long remaining = sz - position;
try {
final long sz = size();
if (position > sz)
return 0;
// Adjust count only if remaining > 0, i.e.,
// sz > position which means sz > 0
if (remaining > 0 && remaining < count)
count = remaining;
// Now position <= sz so remaining >= 0 and
// remaining == 0 if and only if sz == 0
long remaining = sz - position;
// System calls supporting fast transfers might not work on files
// which advertise zero size such as those in Linux /proc
if (sz > 0) {
// Attempt a direct transfer, if the kernel supports it, limiting
// the number of bytes according to which platform
int icount = (int)Math.min(count, nd.maxDirectTransferSize());
long n;
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Adjust count only if remaining > 0, i.e.,
// sz > position which means sz > 0
if (remaining > 0 && remaining < count)
count = remaining;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, count, target)) >= 0)
return n;
// System calls supporting fast transfers might not work on files
// which advertise zero size such as those in Linux /proc
if (sz > 0) {
// Attempt a direct transfer, if the kernel supports it, limiting
// the number of bytes according to which platform
int icount = (int) Math.min(count, nd.maxDirectTransferSize());
long n;
if ((n = transferToDirect(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, count, target)) >= 0)
return n;
}
// fallback to read/write loop
return transferToArbitraryChannel(position, count, target);
} catch (ClosedChannelException e) {
// throw AsynchronousCloseException or ClosedByInterruptException
throw transferFailed(e, target);
}
// Slow path for untrusted targets
return transferToArbitraryChannel(position, count, target);
}
// Assume at first that the underlying kernel supports copy_file_range();
// set this to true if we find out later that it doesn't
//
private static volatile boolean transferFromNotSupported;
/**
* Transfers bytes into this channel's file from the resource that the given file
* descriptor is connected to.
*/
private long transferFromFileDescriptor(FileDescriptor srcFD, long position, long count) {
long n;
boolean append = fdAccess.getAppend(fd);
do {
long comp = Blocker.begin();
try {
n = nd.transferFrom(srcFD, fd, position, count, append);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return n;
}
private long transferFromDirectlyInternal(FileDescriptor srcFD,
long position, long count)
/**
* Transfers bytes into this channel's file from the given channel's file. This
* implementation uses copy_file_range or equivalent.
*/
private long transferFromDirect(FileChannelImpl src, long position, long count)
throws IOException
{
long n = -1;
int ti = -1;
if (transferFromDirectNotSupported)
return IOStatus.UNSUPPORTED;
final FileChannelImpl target = this;
boolean completed = false;
try {
beginBlocking();
ti = threads.add();
if (!isOpen())
return -1;
do {
long comp = Blocker.begin();
int srcIndex = src.beforeTransfer();
try {
int targetIndex = target.beforeTransfer();
try {
boolean append = fdAccess.getAppend(fd);
n = nd.transferFrom(srcFD, fd, position, count, append);
long n = transferFromFileDescriptor(src.fd, position, count);
if (n == IOStatus.UNSUPPORTED) {
transferFromDirectNotSupported = true;
return IOStatus.UNSUPPORTED;
}
completed = (n >= 0);
return IOStatus.normalize(n);
} finally {
Blocker.end(comp);
target.afterTransfer(completed, targetIndex);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED) {
// Don't bother trying again
transferFromNotSupported = true;
return IOStatus.UNSUPPORTED;
} finally {
src.afterTransfer(completed, srcIndex);
}
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
end (n > -1);
endBlocking(completed);
}
}
private long transferFromDirectly(FileChannelImpl src,
long position, long count)
/**
* Transfers bytes into this channel's file from the given channel's file. This
* implementation memory maps the given channel's file.
*/
private long transferFromFileChannel(FileChannelImpl src, long position, long count)
throws IOException
{
if (!src.readable)
throw new NonReadableChannelException();
if (transferFromNotSupported)
return IOStatus.UNSUPPORTED;
FileDescriptor srcFD = src.fd;
if (srcFD == null)
return IOStatus.UNSUPPORTED_CASE;
return transferFromDirectlyInternal(srcFD, position, count);
}
private long transferFromFileChannel(FileChannelImpl src,
long position, long count)
throws IOException
{
if (!src.readable)
throw new NonReadableChannelException();
if (count < MAPPED_TRANSFER_THRESHOLD)
return IOStatus.UNSUPPORTED_CASE;
@ -852,19 +958,17 @@ public class FileChannelImpl
long pos = src.position();
long max = Math.min(count, src.size() - pos);
if (src == this) {
if (position() - max + 1 <= pos &&
pos - max + 1 <= position() &&
!nd.canTransferToFromOverlappedMap()) {
return IOStatus.UNSUPPORTED_CASE;
}
if (src == this
&& (position() - max + 1 <= pos)
&& (pos - max + 1 <= position())
&& !nd.canTransferToFromOverlappedMap()) {
return IOStatus.UNSUPPORTED_CASE;
}
long remaining = max;
long p = pos;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
// ## Bug: Closing this channel will not terminate the write
MappedByteBuffer bb = src.map(MapMode.READ_ONLY, p, size);
try {
long n = write(bb, position);
@ -889,20 +993,21 @@ public class FileChannelImpl
private static final int TRANSFER_SIZE = 8192;
/**
* Transfers bytes into this channel's file from the given channel.
*/
private long transferFromArbitraryChannel(ReadableByteChannel src,
long position, long count)
throws IOException
{
// Untrusted target: Use a newly-erased buffer
int c = (int)Math.min(count, TRANSFER_SIZE);
int c = (int) Math.min(count, TRANSFER_SIZE);
ByteBuffer bb = ByteBuffer.allocate(c);
long tw = 0; // Total bytes written
long pos = position;
try {
while (tw < count) {
bb.limit((int)Math.min((count - tw), (long)TRANSFER_SIZE));
// ## Bug: Will block reading src if this channel
// ## is asynchronously closed
bb.limit((int) Math.min((count - tw), TRANSFER_SIZE));
// read may block, closing this channel will not wake it up
int nr = src.read(bb);
if (nr <= 0)
break;
@ -922,8 +1027,8 @@ public class FileChannelImpl
}
}
public long transferFrom(ReadableByteChannel src,
long position, long count)
@Override
public long transferFrom(ReadableByteChannel src, long position, long count)
throws IOException
{
ensureOpen();
@ -936,19 +1041,26 @@ public class FileChannelImpl
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
// System calls supporting fast transfers might not work on files
// which advertise zero size such as those in Linux /proc
if (src instanceof FileChannelImpl fci && fci.size() > 0) {
long n;
if ((n = transferFromDirectly(fci, position, count)) >= 0)
return n;
if ((n = transferFromFileChannel(fci, position, count)) >= 0)
return n;
}
try {
// System calls supporting fast transfers might not work on files
// which advertise zero size such as those in Linux /proc
if (src instanceof FileChannelImpl fci && fci.size() > 0) {
long n;
if ((n = transferFromDirect(fci, position, count)) >= 0)
return n;
if ((n = transferFromFileChannel(fci, position, count)) >= 0)
return n;
}
return transferFromArbitraryChannel(src, position, count);
// fallback to read/write loop
return transferFromArbitraryChannel(src, position, count);
} catch (ClosedChannelException e) {
// throw AsynchronousCloseException or ClosedByInterruptException
throw transferFailed(e, src);
}
}
@Override
public int read(ByteBuffer dst, long position) throws IOException {
if (dst == null)
throw new NullPointerException();
@ -994,6 +1106,7 @@ public class FileChannelImpl
}
}
@Override
public int write(ByteBuffer src, long position) throws IOException {
if (src == null)
throw new NullPointerException();
@ -1178,6 +1291,7 @@ public class FileChannelImpl
private static final int MAP_RW = 1;
private static final int MAP_PV = 2;
@Override
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
if (size > Integer.MAX_VALUE)
throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");
@ -1205,9 +1319,8 @@ public class FileChannelImpl
}
@Override
public MemorySegment map(MapMode mode, long offset, long size,
Arena arena)
throws IOException
public MemorySegment map(MapMode mode, long offset, long size, Arena arena)
throws IOException
{
Objects.requireNonNull(mode,"Mode is null");
Objects.requireNonNull(arena, "Arena is null");
@ -1460,6 +1573,7 @@ public class FileChannelImpl
return fileLockTable;
}
@Override
public FileLock lock(long position, long size, boolean shared)
throws IOException
{
@ -1512,6 +1626,7 @@ public class FileChannelImpl
return fli;
}
@Override
public FileLock tryLock(long position, long size, boolean shared)
throws IOException
{