8223353: (ch) Change channel close implementation to not wait for I/O threads

Reviewed-by: dfuchs, chegar
This commit is contained in:
Alan Bateman 2019-05-08 08:15:04 +01:00
parent 260ae30b14
commit 94d1d0d3d4
6 changed files with 478 additions and 669 deletions

View file

@ -35,7 +35,6 @@ import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class SinkChannelImpl
@ -54,16 +53,14 @@ class SinkChannelImpl
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
private final ReentrantLock stateLock = new ReentrantLock();
private final Condition stateCondition = stateLock.newCondition();
private final Object stateLock = new Object();
// -- The following fields are protected by stateLock
// Channel state
private static final int ST_INUSE = 0;
private static final int ST_CLOSING = 1;
private static final int ST_KILLPENDING = 2;
private static final int ST_KILLED = 3;
private static final int ST_CLOSED = 2;
private int state;
// ID of native thread doing write, for signalling
@ -86,83 +83,93 @@ class SinkChannelImpl
this.fdVal = IOUtil.fdVal(fd);
}
/**
* Closes the write end of the pipe if there are no write operation in
* progress and the channel is not registered with a Selector.
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if (thread == 0 && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
} else {
return false;
}
}
/**
* Invokes tryClose to attempt to close the write end of the pipe.
*
* This method is used for deferred closing by I/O and Selector operations.
*/
private void tryFinishClose() {
try {
tryClose();
} catch (IOException ignore) { }
}
/**
* Closes this channel when configured in blocking mode.
*
* If there is a write operation in progress then the write-end of the pipe
* is pre-closed and the writer is signalled, in which case the final close
* is deferred until the writer aborts.
*/
private void implCloseBlockingMode() throws IOException {
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
if (!tryClose()) {
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
}
}
}
}
/**
* Closes this channel when configured in non-blocking mode.
*
* If the channel is registered with a Selector then the close is deferred
* until the channel is flushed from all Selectors.
*/
private void implCloseNonBlockingMode() throws IOException {
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
}
// wait for any write operation to complete before trying to close
writeLock.lock();
writeLock.unlock();
synchronized (stateLock) {
if (state == ST_CLOSING) {
tryClose();
}
}
}
/**
* Invoked by implCloseChannel to close the channel.
*/
@Override
protected void implCloseSelectableChannel() throws IOException {
assert !isOpen();
boolean interrupted = false;
boolean blocking;
// set state to ST_CLOSING
stateLock.lock();
try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
} finally {
stateLock.unlock();
}
// wait for any outstanding write to complete
if (blocking) {
stateLock.lock();
try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
// wait for write operation to end
while (thread != 0) {
try {
stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
stateLock.unlock();
}
if (isBlocking()) {
implCloseBlockingMode();
} else {
// non-blocking mode: wait for write to complete
writeLock.lock();
writeLock.unlock();
implCloseNonBlockingMode();
}
// set state to ST_KILLPENDING
stateLock.lock();
try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
} finally {
stateLock.unlock();
}
// close socket if not registered with Selector
if (!isRegistered())
kill();
// restore interrupt status
if (interrupted)
Thread.currentThread().interrupt();
}
@Override
public void kill() throws IOException {
stateLock.lock();
try {
assert thread == 0;
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
public void kill() {
synchronized (stateLock) {
if (state == ST_CLOSING) {
tryFinishClose();
}
} finally {
stateLock.unlock();
}
}
@ -170,11 +177,10 @@ class SinkChannelImpl
protected void implConfigureBlocking(boolean block) throws IOException {
writeLock.lock();
try {
stateLock.lock();
try {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
IOUtil.configureBlocking(fd, block);
} finally {
stateLock.unlock();
}
} finally {
writeLock.unlock();
@ -229,14 +235,11 @@ class SinkChannelImpl
// set hook for Thread.interrupt
begin();
}
stateLock.lock();
try {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking)
thread = NativeThread.current();
} finally {
stateLock.unlock();
}
}
@ -250,15 +253,11 @@ class SinkChannelImpl
throws AsynchronousCloseException
{
if (blocking) {
stateLock.lock();
try {
synchronized (stateLock) {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateCondition.signalAll();
tryFinishClose();
}
} finally {
stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);

View file

@ -35,7 +35,6 @@ import java.nio.channels.Pipe;
import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider;
import java.util.Objects;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class SourceChannelImpl
@ -54,16 +53,14 @@ class SourceChannelImpl
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
private final ReentrantLock stateLock = new ReentrantLock();
private final Condition stateCondition = stateLock.newCondition();
private final Object stateLock = new Object();
// -- The following fields are protected by stateLock
// Channel state
private static final int ST_INUSE = 0;
private static final int ST_CLOSING = 1;
private static final int ST_KILLPENDING = 2;
private static final int ST_KILLED = 3;
private static final int ST_CLOSED = 2;
private int state;
// ID of native thread doing read, for signalling
@ -86,83 +83,93 @@ class SourceChannelImpl
this.fdVal = IOUtil.fdVal(fd);
}
/**
* Closes the read end of the pipe if there are no read operation in
* progress and the channel is not registered with a Selector.
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
if (thread == 0 && !isRegistered()) {
state = ST_CLOSED;
nd.close(fd);
return true;
} else {
return false;
}
}
/**
* Invokes tryClose to attempt to close the read end of the pipe.
*
* This method is used for deferred closing by I/O and Selector operations.
*/
private void tryFinishClose() {
try {
tryClose();
} catch (IOException ignore) { }
}
/**
* Closes this channel when configured in blocking mode.
*
* If there is a read operation in progress then the read-end of the pipe
* is pre-closed and the reader is signalled, in which case the final close
* is deferred until the reader aborts.
*/
private void implCloseBlockingMode() throws IOException {
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
if (!tryClose()) {
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
}
}
}
}
/**
* Closes this channel when configured in non-blocking mode.
*
* If the channel is registered with a Selector then the close is deferred
* until the channel is flushed from all Selectors.
*/
private void implCloseNonBlockingMode() throws IOException {
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
}
// wait for any read operation to complete before trying to close
readLock.lock();
readLock.unlock();
synchronized (stateLock) {
if (state == ST_CLOSING) {
tryClose();
}
}
}
/**
* Invoked by implCloseChannel to close the channel.
*/
@Override
protected void implCloseSelectableChannel() throws IOException {
assert !isOpen();
boolean interrupted = false;
boolean blocking;
// set state to ST_CLOSING
stateLock.lock();
try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
} finally {
stateLock.unlock();
}
// wait for any outstanding read to complete
if (blocking) {
stateLock.lock();
try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
// wait for read operation to end
while (thread != 0) {
try {
stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
stateLock.unlock();
}
if (isBlocking()) {
implCloseBlockingMode();
} else {
// non-blocking mode: wait for read to complete
readLock.lock();
readLock.unlock();
implCloseNonBlockingMode();
}
// set state to ST_KILLPENDING
stateLock.lock();
try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
} finally {
stateLock.unlock();
}
// close socket if not registered with Selector
if (!isRegistered())
kill();
// restore interrupt status
if (interrupted)
Thread.currentThread().interrupt();
}
@Override
public void kill() throws IOException {
stateLock.lock();
try {
assert thread == 0;
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
public void kill() {
synchronized (stateLock) {
assert !isOpen();
if (state == ST_CLOSING) {
tryFinishClose();
}
} finally {
stateLock.unlock();
}
}
@ -170,11 +177,10 @@ class SourceChannelImpl
protected void implConfigureBlocking(boolean block) throws IOException {
readLock.lock();
try {
stateLock.lock();
try {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
IOUtil.configureBlocking(fd, block);
} finally {
stateLock.unlock();
}
} finally {
readLock.unlock();
@ -229,14 +235,11 @@ class SourceChannelImpl
// set hook for Thread.interrupt
begin();
}
stateLock.lock();
try {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking)
thread = NativeThread.current();
} finally {
stateLock.unlock();
}
}
@ -250,15 +253,11 @@ class SourceChannelImpl
throws AsynchronousCloseException
{
if (blocking) {
stateLock.lock();
try {
synchronized (stateLock) {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateCondition.signalAll();
tryFinishClose();
}
} finally {
stateLock.unlock();
}
// remove hook for Thread.interrupt
end(completed);