8198562: (ch) Separate blocking and non-blocking code paths (part 1)

8198754: (ch) Separate blocking and non-blocking code paths (part 2)

Reviewed-by: bpb
This commit is contained in:
Alan Bateman 2018-02-28 09:54:38 +00:00
parent 3918ed17a5
commit 13dd8888d2
16 changed files with 1645 additions and 1279 deletions

View file

@ -35,6 +35,7 @@ import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.SelectionKey;
@ -43,6 +44,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
@ -56,7 +58,6 @@ class ServerSocketChannelImpl
extends ServerSocketChannel
implements SelChImpl
{
// Used to make native close and configure calls
private static NativeDispatcher nd;
@ -64,10 +65,7 @@ class ServerSocketChannelImpl
private final FileDescriptor fd;
private final int fdVal;
// ID of native thread currently blocked in this channel, for signalling
private volatile long thread;
// Lock held by thread currently blocked in this channel
// Lock held by thread currently blocked on this channel
private final ReentrantLock acceptLock = new ReentrantLock();
// Lock held by any thread that modifies the state fields declared below
@ -77,10 +75,14 @@ class ServerSocketChannelImpl
// -- The following fields are protected by stateLock
// Channel state, increases monotonically
private static final int ST_UNINITIALIZED = -1;
private static final int ST_INUSE = 0;
private static final int ST_KILLED = 1;
private int state = ST_UNINITIALIZED;
private static final int ST_CLOSING = 1;
private static final int ST_KILLPENDING = 2;
private static final int ST_KILLED = 3;
private int state;
// ID of native thread currently blocked in this channel, for signalling
private long thread;
// Binding
private InetSocketAddress localAddress; // null => unbound
@ -98,22 +100,28 @@ class ServerSocketChannelImpl
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
ServerSocketChannelImpl(SelectorProvider sp,
FileDescriptor fd,
boolean bound)
ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
throws IOException
{
super(sp);
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
if (bound)
localAddress = Net.localAddress(fd);
if (bound) {
synchronized (stateLock) {
localAddress = Net.localAddress(fd);
}
}
}
// @throws ClosedChannelException if channel is closed
private void ensureOpen() throws ClosedChannelException {
if (!isOpen())
throw new ClosedChannelException();
}
@Override
public ServerSocket socket() {
synchronized (stateLock) {
if (socket == null)
@ -125,11 +133,10 @@ class ServerSocketChannelImpl
@Override
public SocketAddress getLocalAddress() throws IOException {
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
return localAddress == null ? localAddress
: Net.getRevealedLocalAddress(
Net.asInetSocketAddress(localAddress));
ensureOpen();
return (localAddress == null)
? null
: Net.getRevealedLocalAddress(localAddress);
}
}
@ -137,13 +144,11 @@ class ServerSocketChannelImpl
public <T> ServerSocketChannel setOption(SocketOption<T> name, T value)
throws IOException
{
if (name == null)
throw new NullPointerException();
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
ProtocolFamily family = Net.isIPv6Available() ?
@ -152,9 +157,7 @@ class ServerSocketChannelImpl
return this;
}
if (name == StandardSocketOptions.SO_REUSEADDR &&
Net.useExclusiveBind())
{
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
isReuseAddress = (Boolean)value;
} else {
@ -170,17 +173,13 @@ class ServerSocketChannelImpl
public <T> T getOption(SocketOption<T> name)
throws IOException
{
if (name == null)
throw new NullPointerException();
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
if (!isOpen())
throw new ClosedChannelException();
if (name == StandardSocketOptions.SO_REUSEADDR &&
Net.useExclusiveBind())
{
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
return (T)Boolean.valueOf(isReuseAddress);
}
@ -193,7 +192,7 @@ class ServerSocketChannelImpl
static final Set<SocketOption<?>> defaultOptions = defaultOptions();
private static Set<SocketOption<?>> defaultOptions() {
HashSet<SocketOption<?>> set = new HashSet<>(2);
HashSet<SocketOption<?>> set = new HashSet<>();
set.add(StandardSocketOptions.SO_RCVBUF);
set.add(StandardSocketOptions.SO_REUSEADDR);
if (Net.isReusePortAvailable()) {
@ -209,35 +208,23 @@ class ServerSocketChannelImpl
return DefaultOptionsHolder.defaultOptions;
}
public boolean isBound() {
synchronized (stateLock) {
return localAddress != null;
}
}
public InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
acceptLock.lock();
try {
if (!isOpen())
throw new ClosedChannelException();
if (isBound())
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
InetSocketAddress isa = (local == null)
? new InetSocketAddress(0)
: Net.checkAddress(local);
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
}
} finally {
@ -246,47 +233,78 @@ class ServerSocketChannelImpl
return this;
}
/**
* Marks the beginning of an I/O operation that might block.
*
* @throws ClosedChannelException if the channel is closed
* @throws NotYetBoundException if the channel's socket has not been bound yet
*/
private void begin(boolean blocking) throws ClosedChannelException {
if (blocking)
begin(); // set blocker to close channel if interrupted
synchronized (stateLock) {
ensureOpen();
if (localAddress == null)
throw new NotYetBoundException();
if (blocking)
thread = NativeThread.current();
}
}
/**
* Marks the end of an I/O operation that may have blocked.
*
* @throws AsynchronousCloseException if the channel was closed due to this
* thread being interrupted on a blocking I/O operation.
*/
private void end(boolean blocking, boolean completed)
throws AsynchronousCloseException
{
if (blocking) {
synchronized (stateLock) {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateLock.notifyAll();
}
}
end(completed);
}
}
@Override
public SocketChannel accept() throws IOException {
acceptLock.lock();
try {
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
boolean blocking = isBlocking();
try {
begin();
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
begin(blocking);
do {
n = accept(this.fd, newfd, isaa);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
break;
}
} while (n == IOStatus.INTERRUPTED && isOpen());
} finally {
thread = 0;
end(n > 0);
end(blocking, n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
sc = new SocketChannelImpl(provider(), newfd, isa);
SocketChannel sc = new SocketChannelImpl(provider(), newfd, isa);
// check permitted to accept connections from the remote address
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
@ -299,33 +317,133 @@ class ServerSocketChannelImpl
}
}
@Override
protected void implConfigureBlocking(boolean block) throws IOException {
IOUtil.configureBlocking(fd, block);
}
protected void implCloseSelectableChannel() throws IOException {
synchronized (stateLock) {
if (state != ST_KILLED)
nd.preClose(fd);
long th = thread;
if (th != 0)
NativeThread.signal(th);
if (!isRegistered())
kill();
acceptLock.lock();
try {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, block);
}
} finally {
acceptLock.unlock();
}
}
/**
* Invoked by implCloseChannel to close the channel.
*
* This method waits for outstanding I/O operations to complete. When in
* blocking mode, the socket is pre-closed and the threads in blocking I/O
* operations are signalled to ensure that the outstanding I/O operations
* complete quickly.
*
* The socket is closed by this method when it is not registered with a
* Selector. Note that a channel configured blocking may be registered with
* a Selector. This arises when a key is canceled and the channel configured
* to blocking mode before the key is flushed from the Selector.
*/
@Override
protected void implCloseSelectableChannel() throws IOException {
assert !isOpen();
boolean interrupted = false;
boolean blocking;
// set state to ST_CLOSING
synchronized (stateLock) {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
}
// wait for any outstanding accept to complete
if (blocking) {
synchronized (stateLock) {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
// wait for accept operation to end
while (thread != 0) {
try {
stateLock.wait();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
}
} else {
// non-blocking mode: wait for accept to complete
acceptLock.lock();
acceptLock.unlock();
}
// set state to ST_KILLPENDING
synchronized (stateLock) {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
}
// close socket if not registered with Selector
if (!isRegistered())
kill();
// restore interrupt status
if (interrupted)
Thread.currentThread().interrupt();
}
@Override
public void kill() throws IOException {
synchronized (stateLock) {
if (state == ST_KILLED)
return;
if (state == ST_UNINITIALIZED) {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
return;
nd.close(fd);
}
assert !isOpen() && !isRegistered();
nd.close(fd);
state = ST_KILLED;
}
}
/**
* Returns true if channel's socket is bound
*/
boolean isBound() {
synchronized (stateLock) {
return localAddress != null;
}
}
/**
* Returns the local address, or null if not bound
*/
InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
/**
* Poll this channel's socket for a new connection up to the given timeout.
* @return {@code true} if there is a connection to accept
*/
boolean pollAccept(long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && isBlocking();
acceptLock.lock();
try {
boolean polled = false;
try {
begin(true);
int n = Net.poll(fd, Net.POLLIN, timeout);
polled = (n > 0);
} finally {
end(true, polled);
}
return polled;
} finally {
acceptLock.unlock();
}
}
@ -367,31 +485,6 @@ class ServerSocketChannelImpl
return translateReadyOps(ops, 0, sk);
}
// package-private
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
acceptLock.lock();
try {
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
thread = NativeThread.current();
}
n = Net.poll(fd, events, timeout);
} finally {
thread = 0;
end(n > 0);
}
return n;
} finally {
acceptLock.unlock();
}
}
/**
* Translates an interest operation set into a native poll event set
*/
@ -421,7 +514,7 @@ class ServerSocketChannelImpl
sb.append("closed");
} else {
synchronized (stateLock) {
InetSocketAddress addr = localAddress();
InetSocketAddress addr = localAddress;
if (addr == null) {
sb.append("unbound");
} else {
@ -438,7 +531,8 @@ class ServerSocketChannelImpl
*
* @implNote Wrap native call to allow instrumentation.
*/
private int accept(FileDescriptor ssfd, FileDescriptor newfd,
private int accept(FileDescriptor ssfd,
FileDescriptor newfd,
InetSocketAddress[] isaa)
throws IOException
{
@ -452,7 +546,8 @@ class ServerSocketChannelImpl
// Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
// connections are pending) or IOStatus.INTERRUPTED.
//
private native int accept0(FileDescriptor ssfd, FileDescriptor newfd,
private native int accept0(FileDescriptor ssfd,
FileDescriptor newfd,
InetSocketAddress[] isaa)
throws IOException;