8222774: (ch) Replace uses of stateLock and blockingLock with j.u.c. locks

Reviewed-by: dfuchs, bpb, martin
This commit is contained in:
Alan Bateman 2019-04-25 10:41:49 +01:00
parent 8322ce2e6b
commit 2998236d83
13 changed files with 1762 additions and 487 deletions

View file

@ -31,10 +31,12 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.NotYetBoundException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
@ -44,6 +46,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import sun.net.NetHooks;
@ -69,7 +72,8 @@ class ServerSocketChannelImpl
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
private final Object stateLock = new Object();
private final ReentrantLock stateLock = new ReentrantLock();
private final Condition stateCondition = stateLock.newCondition();
// -- The following fields are protected by stateLock
@ -95,7 +99,7 @@ class ServerSocketChannelImpl
// -- End of fields protected by stateLock
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
ServerSocketChannelImpl(SelectorProvider sp) {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
@ -108,8 +112,11 @@ class ServerSocketChannelImpl
this.fd = fd;
this.fdVal = IOUtil.fdVal(fd);
if (bound) {
synchronized (stateLock) {
stateLock.lock();
try {
localAddress = Net.localAddress(fd);
} finally {
stateLock.unlock();
}
}
}
@ -122,20 +129,26 @@ class ServerSocketChannelImpl
@Override
public ServerSocket socket() {
synchronized (stateLock) {
stateLock.lock();
try {
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
} finally {
stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
synchronized (stateLock) {
stateLock.lock();
try {
ensureOpen();
return (localAddress == null)
? null
: Net.getRevealedLocalAddress(localAddress);
} finally {
stateLock.unlock();
}
}
@ -146,7 +159,8 @@ class ServerSocketChannelImpl
Objects.requireNonNull(name);
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
stateLock.lock();
try {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
@ -157,6 +171,8 @@ class ServerSocketChannelImpl
Net.setSocketOption(fd, Net.UNSPEC, name, value);
}
return this;
} finally {
stateLock.unlock();
}
}
@ -169,7 +185,8 @@ class ServerSocketChannelImpl
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
synchronized (stateLock) {
stateLock.lock();
try {
ensureOpen();
if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
// SO_REUSEADDR emulated when using exclusive bind
@ -177,6 +194,8 @@ class ServerSocketChannelImpl
}
// no options that require special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
} finally {
stateLock.unlock();
}
}
@ -202,7 +221,8 @@ class ServerSocketChannelImpl
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (stateLock) {
stateLock.lock();
try {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
@ -216,6 +236,8 @@ class ServerSocketChannelImpl
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
localAddress = Net.localAddress(fd);
} finally {
stateLock.unlock();
}
return this;
}
@ -229,12 +251,15 @@ class ServerSocketChannelImpl
private void begin(boolean blocking) throws ClosedChannelException {
if (blocking)
begin(); // set blocker to close channel if interrupted
synchronized (stateLock) {
stateLock.lock();
try {
ensureOpen();
if (localAddress == null)
throw new NotYetBoundException();
if (blocking)
thread = NativeThread.current();
} finally {
stateLock.unlock();
}
}
@ -248,12 +273,15 @@ class ServerSocketChannelImpl
throws AsynchronousCloseException
{
if (blocking) {
synchronized (stateLock) {
stateLock.lock();
try {
thread = 0;
// notify any thread waiting in implCloseSelectableChannel
if (state == ST_CLOSING) {
stateLock.notifyAll();
stateCondition.signalAll();
}
} finally {
stateLock.unlock();
}
end(completed);
}
@ -270,22 +298,82 @@ class ServerSocketChannelImpl
boolean blocking = isBlocking();
try {
begin(blocking);
do {
n = Net.accept(this.fd, newfd, isaa);
} while (n == IOStatus.INTERRUPTED && isOpen());
n = Net.accept(this.fd, newfd, isaa);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLIN);
n = Net.accept(this.fd, newfd, isaa);
}
}
} finally {
end(blocking, n > 0);
assert IOStatus.check(n);
}
} finally {
acceptLock.unlock();
}
if (n < 1)
if (n > 0) {
return finishAccept(newfd, isaa[0]);
} else {
return null;
}
}
InetSocketAddress isa = isaa[0];
/**
* Accepts a new connection with a given timeout. This method requires the
* channel to be configured in blocking mode.
*
* @apiNote This method is for use by the socket adaptor.
*
* @param nanos the timeout, in nanoseconds
* @throws IllegalBlockingModeException if the channel is configured non-blocking
* @throws SocketTimeoutException if the timeout expires
*/
SocketChannel blockingAccept(long nanos) throws IOException {
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
acceptLock.lock();
try {
// check that channel is configured blocking
if (!isBlocking())
throw new IllegalBlockingModeException();
try {
begin(true);
// change socket to non-blocking
lockedConfigureBlocking(false);
try {
long startNanos = System.nanoTime();
n = Net.accept(fd, newfd, isaa);
while (n == IOStatus.UNAVAILABLE && isOpen()) {
long remainingNanos = nanos - (System.nanoTime() - startNanos);
if (remainingNanos <= 0) {
throw new SocketTimeoutException("Accept timed out");
}
park(Net.POLLIN, remainingNanos);
n = Net.accept(fd, newfd, isaa);
}
} finally {
// restore socket to blocking mode
lockedConfigureBlocking(true);
}
} finally {
end(true, n > 0);
}
} finally {
acceptLock.unlock();
}
assert n > 0;
return finishAccept(newfd, isaa[0]);
}
private SocketChannel finishAccept(FileDescriptor newfd, InetSocketAddress isa)
throws IOException
{
try {
// newly accepted socket is initially in blocking mode
IOUtil.configureBlocking(newfd, true);
@ -306,15 +394,26 @@ class ServerSocketChannelImpl
protected void implConfigureBlocking(boolean block) throws IOException {
acceptLock.lock();
try {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, block);
}
lockedConfigureBlocking(block);
} finally {
acceptLock.unlock();
}
}
/**
* Adjust the blocking mode while holding acceptLock.
*/
private void lockedConfigureBlocking(boolean block) throws IOException {
assert acceptLock.isHeldByCurrentThread();
stateLock.lock();
try {
ensureOpen();
IOUtil.configureBlocking(fd, block);
} finally {
stateLock.unlock();
}
}
/**
* Invoked by implCloseChannel to close the channel.
*
@ -336,15 +435,19 @@ class ServerSocketChannelImpl
boolean blocking;
// set state to ST_CLOSING
synchronized (stateLock) {
stateLock.lock();
try {
assert state < ST_CLOSING;
state = ST_CLOSING;
blocking = isBlocking();
} finally {
stateLock.unlock();
}
// wait for any outstanding accept to complete
if (blocking) {
synchronized (stateLock) {
stateLock.lock();
try {
assert state == ST_CLOSING;
long th = thread;
if (th != 0) {
@ -354,12 +457,14 @@ class ServerSocketChannelImpl
// wait for accept operation to end
while (thread != 0) {
try {
stateLock.wait();
stateCondition.await();
} catch (InterruptedException e) {
interrupted = true;
}
}
}
} finally {
stateLock.unlock();
}
} else {
// non-blocking mode: wait for accept to complete
@ -368,9 +473,12 @@ class ServerSocketChannelImpl
}
// set state to ST_KILLPENDING
synchronized (stateLock) {
stateLock.lock();
try {
assert state == ST_CLOSING;
state = ST_KILLPENDING;
} finally {
stateLock.unlock();
}
// close socket if not registered with Selector
@ -384,11 +492,14 @@ class ServerSocketChannelImpl
@Override
public void kill() throws IOException {
synchronized (stateLock) {
stateLock.lock();
try {
if (state == ST_KILLPENDING) {
state = ST_KILLED;
nd.close(fd);
}
} finally {
stateLock.unlock();
}
}
@ -396,8 +507,11 @@ class ServerSocketChannelImpl
* Returns true if channel's socket is bound
*/
boolean isBound() {
synchronized (stateLock) {
stateLock.lock();
try {
return localAddress != null;
} finally {
stateLock.unlock();
}
}
@ -405,30 +519,11 @@ class ServerSocketChannelImpl
* Returns the local address, or null if not bound
*/
InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
}
}
/**
* Poll this channel's socket for a new connection up to the given timeout.
* @return {@code true} if there is a connection to accept
*/
boolean pollAccept(long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && isBlocking();
acceptLock.lock();
stateLock.lock();
try {
boolean polled = false;
try {
begin(true);
int events = Net.poll(fd, Net.POLLIN, timeout);
polled = (events != 0);
} finally {
end(true, polled);
}
return polled;
return localAddress;
} finally {
acceptLock.unlock();
stateLock.unlock();
}
}
@ -494,13 +589,16 @@ class ServerSocketChannelImpl
if (!isOpen()) {
sb.append("closed");
} else {
synchronized (stateLock) {
stateLock.lock();
try {
InetSocketAddress addr = localAddress;
if (addr == null) {
sb.append("unbound");
} else {
sb.append(Net.getRevealedLocalAddressAsString(addr));
}
} finally {
stateLock.unlock();
}
}
sb.append(']');