mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-28 07:14:30 +02:00
8232673: (dc) DatagramChannel socket adaptor issues
Reviewed-by: dfuchs, chegar
This commit is contained in:
parent
5dafc279a7
commit
db4909bf99
8 changed files with 948 additions and 245 deletions
|
@ -606,7 +606,6 @@ class DatagramSocket implements java.io.Closeable {
|
|||
* @see #bind(SocketAddress)
|
||||
* @since 1.4
|
||||
*/
|
||||
|
||||
public SocketAddress getLocalSocketAddress() {
|
||||
if (isClosed())
|
||||
return null;
|
||||
|
@ -853,7 +852,7 @@ class DatagramSocket implements java.io.Closeable {
|
|||
public InetAddress getLocalAddress() {
|
||||
if (isClosed())
|
||||
return null;
|
||||
InetAddress in = null;
|
||||
InetAddress in;
|
||||
try {
|
||||
in = (InetAddress) getImpl().getOption(SocketOptions.SO_BINDADDR);
|
||||
if (in.isAnyLocalAddress()) {
|
||||
|
@ -874,8 +873,8 @@ class DatagramSocket implements java.io.Closeable {
|
|||
* is bound.
|
||||
*
|
||||
* @return the port number on the local host to which this socket is bound,
|
||||
{@code -1} if the socket is closed, or
|
||||
{@code 0} if it is not bound yet.
|
||||
* {@code -1} if the socket is closed, or
|
||||
* {@code 0} if it is not bound yet.
|
||||
*/
|
||||
public int getLocalPort() {
|
||||
if (isClosed())
|
||||
|
@ -887,15 +886,16 @@ class DatagramSocket implements java.io.Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/** Enable/disable SO_TIMEOUT with the specified timeout, in
|
||||
* milliseconds. With this option set to a positive timeout value,
|
||||
* a call to receive() for this DatagramSocket
|
||||
* will block for only this amount of time. If the timeout expires,
|
||||
* a <B>java.net.SocketTimeoutException</B> is raised, though the
|
||||
* DatagramSocket is still valid. A timeout of zero is interpreted
|
||||
* as an infinite timeout.
|
||||
* The option <B>must</B> be enabled prior to entering the blocking
|
||||
* operation to have effect.
|
||||
/**
|
||||
* Enable/disable SO_TIMEOUT with the specified timeout, in
|
||||
* milliseconds. With this option set to a positive timeout value,
|
||||
* a call to receive() for this DatagramSocket
|
||||
* will block for only this amount of time. If the timeout expires,
|
||||
* a <B>java.net.SocketTimeoutException</B> is raised, though the
|
||||
* DatagramSocket is still valid. A timeout of zero is interpreted
|
||||
* as an infinite timeout.
|
||||
* The option <B>must</B> be enabled prior to entering the blocking
|
||||
* operation to have effect.
|
||||
*
|
||||
* @param timeout the specified timeout in milliseconds.
|
||||
* @throws SocketException if there is an error in the underlying protocol, such as an UDP error.
|
||||
|
@ -963,8 +963,7 @@ class DatagramSocket implements java.io.Closeable {
|
|||
* negative.
|
||||
* @see #getSendBufferSize()
|
||||
*/
|
||||
public synchronized void setSendBufferSize(int size)
|
||||
throws SocketException{
|
||||
public synchronized void setSendBufferSize(int size) throws SocketException {
|
||||
if (!(size > 0)) {
|
||||
throw new IllegalArgumentException("negative send size");
|
||||
}
|
||||
|
@ -1021,8 +1020,7 @@ class DatagramSocket implements java.io.Closeable {
|
|||
* negative.
|
||||
* @see #getReceiveBufferSize()
|
||||
*/
|
||||
public synchronized void setReceiveBufferSize(int size)
|
||||
throws SocketException{
|
||||
public synchronized void setReceiveBufferSize(int size) throws SocketException {
|
||||
if (size <= 0) {
|
||||
throw new IllegalArgumentException("invalid receive size");
|
||||
}
|
||||
|
@ -1039,8 +1037,7 @@ class DatagramSocket implements java.io.Closeable {
|
|||
* @throws SocketException if there is an error in the underlying protocol, such as an UDP error.
|
||||
* @see #setReceiveBufferSize(int)
|
||||
*/
|
||||
public synchronized int getReceiveBufferSize()
|
||||
throws SocketException{
|
||||
public synchronized int getReceiveBufferSize() throws SocketException {
|
||||
if (isClosed())
|
||||
throw new SocketException("Socket is closed");
|
||||
int result = 0;
|
||||
|
|
|
@ -28,6 +28,8 @@ package sun.nio.ch;
|
|||
import java.io.FileDescriptor;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.lang.ref.Cleaner.Cleanable;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.Inet4Address;
|
||||
|
@ -39,6 +41,7 @@ import java.net.PortUnreachableException;
|
|||
import java.net.ProtocolFamily;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketOption;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.StandardProtocolFamily;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -47,6 +50,7 @@ import java.nio.channels.AlreadyConnectedException;
|
|||
import java.nio.channels.AsynchronousCloseException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.IllegalBlockingModeException;
|
||||
import java.nio.channels.MembershipKey;
|
||||
import java.nio.channels.NotYetConnectedException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
|
@ -113,8 +117,17 @@ class DatagramChannelImpl
|
|||
private InetSocketAddress localAddress;
|
||||
private InetSocketAddress remoteAddress;
|
||||
|
||||
// Our socket adaptor, if any
|
||||
private DatagramSocket socket;
|
||||
// Socket adaptor, created lazily
|
||||
private static final VarHandle SOCKET;
|
||||
static {
|
||||
try {
|
||||
MethodHandles.Lookup l = MethodHandles.lookup();
|
||||
SOCKET = l.findVarHandle(DatagramChannelImpl.class, "socket", DatagramSocket.class);
|
||||
} catch (Exception e) {
|
||||
throw new InternalError(e);
|
||||
}
|
||||
}
|
||||
private volatile DatagramSocket socket;
|
||||
|
||||
// Multicast support
|
||||
private MembershipRegistry registry;
|
||||
|
@ -199,11 +212,14 @@ class DatagramChannelImpl
|
|||
|
||||
@Override
|
||||
public DatagramSocket socket() {
|
||||
synchronized (stateLock) {
|
||||
if (socket == null)
|
||||
socket = DatagramSocketAdaptor.create(this);
|
||||
return socket;
|
||||
DatagramSocket socket = this.socket;
|
||||
if (socket == null) {
|
||||
socket = DatagramSocketAdaptor.create(this);
|
||||
if (!SOCKET.compareAndSet(this, null, socket)) {
|
||||
socket = this.socket;
|
||||
}
|
||||
}
|
||||
return socket;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -408,62 +424,35 @@ class DatagramChannelImpl
|
|||
public SocketAddress receive(ByteBuffer dst) throws IOException {
|
||||
if (dst.isReadOnly())
|
||||
throw new IllegalArgumentException("Read-only buffer");
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
boolean blocking = isBlocking();
|
||||
boolean completed = false;
|
||||
int n = 0;
|
||||
ByteBuffer bb = null;
|
||||
try {
|
||||
SocketAddress remote = beginRead(blocking, false);
|
||||
boolean connected = (remote != null);
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
|
||||
if (connected || (sm == null)) {
|
||||
// connected or no security manager
|
||||
n = receive(fd, dst, connected);
|
||||
n = receive(dst, connected);
|
||||
if (blocking) {
|
||||
while (IOStatus.okayToRetry(n) && isOpen()) {
|
||||
park(Net.POLLIN);
|
||||
n = receive(fd, dst, connected);
|
||||
n = receive(dst, connected);
|
||||
}
|
||||
} else if (n == IOStatus.UNAVAILABLE) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
// Cannot receive into user's buffer when running with a
|
||||
// security manager and not connected
|
||||
bb = Util.getTemporaryDirectBuffer(dst.remaining());
|
||||
for (;;) {
|
||||
n = receive(fd, bb, connected);
|
||||
if (blocking) {
|
||||
while (IOStatus.okayToRetry(n) && isOpen()) {
|
||||
park(Net.POLLIN);
|
||||
n = receive(fd, bb, connected);
|
||||
}
|
||||
} else if (n == IOStatus.UNAVAILABLE) {
|
||||
return null;
|
||||
}
|
||||
InetSocketAddress isa = (InetSocketAddress)sender;
|
||||
try {
|
||||
sm.checkAccept(isa.getAddress().getHostAddress(),
|
||||
isa.getPort());
|
||||
} catch (SecurityException se) {
|
||||
// Ignore packet
|
||||
bb.clear();
|
||||
n = 0;
|
||||
continue;
|
||||
}
|
||||
bb.flip();
|
||||
dst.put(bb);
|
||||
break;
|
||||
}
|
||||
// security manager and unconnected
|
||||
n = untrustedReceive(dst);
|
||||
}
|
||||
assert sender != null;
|
||||
if (n == IOStatus.UNAVAILABLE)
|
||||
return null;
|
||||
completed = (n > 0) || (n == 0 && isOpen());
|
||||
return sender;
|
||||
} finally {
|
||||
if (bb != null)
|
||||
Util.releaseTemporaryDirectBuffer(bb);
|
||||
endRead(blocking, n > 0);
|
||||
endRead(blocking, completed);
|
||||
assert IOStatus.check(n);
|
||||
}
|
||||
} finally {
|
||||
|
@ -471,15 +460,164 @@ class DatagramChannelImpl
|
|||
}
|
||||
}
|
||||
|
||||
private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected)
|
||||
/**
|
||||
* Receives a datagram into an untrusted buffer. When there is a security
|
||||
* manager set, and the socket is not connected, datagrams have to be received
|
||||
* into a buffer that is not accessible to the user. The datagram is copied
|
||||
* into the user's buffer when the sender address is accepted by the security
|
||||
* manager.
|
||||
*
|
||||
* @return the size of the datagram or IOStatus.UNAVAILABLE
|
||||
*/
|
||||
private int untrustedReceive(ByteBuffer dst) throws IOException {
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
assert readLock.isHeldByCurrentThread()
|
||||
&& sm != null && remoteAddress == null;
|
||||
|
||||
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
|
||||
try {
|
||||
boolean blocking = isBlocking();
|
||||
for (;;) {
|
||||
int n = receive(bb, false);
|
||||
if (blocking) {
|
||||
while (IOStatus.okayToRetry(n) && isOpen()) {
|
||||
park(Net.POLLIN);
|
||||
n = receive(bb, false);
|
||||
}
|
||||
} else if (n == IOStatus.UNAVAILABLE) {
|
||||
return n;
|
||||
}
|
||||
InetSocketAddress isa = (InetSocketAddress) sender;
|
||||
try {
|
||||
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
|
||||
bb.flip();
|
||||
dst.put(bb);
|
||||
return n;
|
||||
} catch (SecurityException se) {
|
||||
// ignore datagram
|
||||
bb.clear();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
Util.releaseTemporaryDirectBuffer(bb);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a datagram into the given buffer.
|
||||
*
|
||||
* @apiNote This method is for use by the socket adaptor. The buffer is
|
||||
* assumed to be trusted, meaning it is not accessible to user code.
|
||||
*
|
||||
* @throws IllegalBlockingModeException if the channel is non-blocking
|
||||
* @throws SocketTimeoutException if the timeout elapses
|
||||
*/
|
||||
SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException {
|
||||
readLock.lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
if (!isBlocking())
|
||||
throw new IllegalBlockingModeException();
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
boolean connected = isConnected();
|
||||
SocketAddress sender;
|
||||
do {
|
||||
if (nanos > 0) {
|
||||
sender = trustedBlockingReceive(dst, nanos);
|
||||
} else {
|
||||
sender = trustedBlockingReceive(dst);
|
||||
}
|
||||
// check sender when security manager set and not connected
|
||||
if (sm != null && !connected) {
|
||||
InetSocketAddress isa = (InetSocketAddress) sender;
|
||||
try {
|
||||
sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort());
|
||||
} catch (SecurityException e) {
|
||||
sender = null;
|
||||
}
|
||||
}
|
||||
} while (sender == null);
|
||||
return sender;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a datagram into given buffer. This method is used to support
|
||||
* the socket adaptor. The buffer is assumed to be trusted.
|
||||
* @throws SocketTimeoutException if the timeout elapses
|
||||
*/
|
||||
private SocketAddress trustedBlockingReceive(ByteBuffer dst)
|
||||
throws IOException
|
||||
{
|
||||
assert readLock.isHeldByCurrentThread() && isBlocking();
|
||||
boolean completed = false;
|
||||
int n = 0;
|
||||
try {
|
||||
SocketAddress remote = beginRead(true, false);
|
||||
boolean connected = (remote != null);
|
||||
n = receive(dst, connected);
|
||||
while (n == IOStatus.UNAVAILABLE && isOpen()) {
|
||||
park(Net.POLLIN);
|
||||
n = receive(dst, connected);
|
||||
}
|
||||
completed = (n > 0) || (n == 0 && isOpen());
|
||||
return sender;
|
||||
} finally {
|
||||
endRead(true, completed);
|
||||
assert IOStatus.check(n);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Receives a datagram into given buffer with a timeout. This method is
|
||||
* used to support the socket adaptor. The buffer is assumed to be trusted.
|
||||
* @throws SocketTimeoutException if the timeout elapses
|
||||
*/
|
||||
private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos)
|
||||
throws IOException
|
||||
{
|
||||
assert readLock.isHeldByCurrentThread() && isBlocking();
|
||||
boolean completed = false;
|
||||
int n = 0;
|
||||
try {
|
||||
SocketAddress remote = beginRead(true, false);
|
||||
boolean connected = (remote != null);
|
||||
|
||||
// change socket to non-blocking
|
||||
lockedConfigureBlocking(false);
|
||||
try {
|
||||
long startNanos = System.nanoTime();
|
||||
n = receive(dst, connected);
|
||||
while (n == IOStatus.UNAVAILABLE && isOpen()) {
|
||||
long remainingNanos = nanos - (System.nanoTime() - startNanos);
|
||||
if (remainingNanos <= 0) {
|
||||
throw new SocketTimeoutException("Receive timed out");
|
||||
}
|
||||
park(Net.POLLIN, remainingNanos);
|
||||
n = receive(dst, connected);
|
||||
}
|
||||
completed = (n > 0) || (n == 0 && isOpen());
|
||||
return sender;
|
||||
} finally {
|
||||
// restore socket to blocking mode (if channel is open)
|
||||
tryLockedConfigureBlocking(true);
|
||||
}
|
||||
|
||||
} finally {
|
||||
endRead(true, completed);
|
||||
assert IOStatus.check(n);
|
||||
}
|
||||
}
|
||||
|
||||
private int receive(ByteBuffer dst, boolean connected) throws IOException {
|
||||
int pos = dst.position();
|
||||
int lim = dst.limit();
|
||||
assert (pos <= lim);
|
||||
int rem = (pos <= lim ? lim - pos : 0);
|
||||
if (dst instanceof DirectBuffer && rem > 0)
|
||||
return receiveIntoNativeBuffer(fd, dst, rem, pos, connected);
|
||||
return receiveIntoNativeBuffer(dst, rem, pos, connected);
|
||||
|
||||
// Substitute a native buffer. If the supplied buffer is empty
|
||||
// we must instead use a nonempty buffer, otherwise the call
|
||||
|
@ -487,7 +625,7 @@ class DatagramChannelImpl
|
|||
int newSize = Math.max(rem, 1);
|
||||
ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize);
|
||||
try {
|
||||
int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected);
|
||||
int n = receiveIntoNativeBuffer(bb, newSize, 0, connected);
|
||||
bb.flip();
|
||||
if (n > 0 && rem > 0)
|
||||
dst.put(bb);
|
||||
|
@ -497,8 +635,8 @@ class DatagramChannelImpl
|
|||
}
|
||||
}
|
||||
|
||||
private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
|
||||
int rem, int pos, boolean connected)
|
||||
private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos,
|
||||
boolean connected)
|
||||
throws IOException
|
||||
{
|
||||
int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected);
|
||||
|
@ -563,6 +701,25 @@ class DatagramChannelImpl
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a datagram from the bytes in given buffer.
|
||||
*
|
||||
* @apiNote This method is for use by the socket adaptor.
|
||||
*
|
||||
* @throws IllegalBlockingModeException if the channel is non-blocking
|
||||
*/
|
||||
void blockingSend(ByteBuffer src, SocketAddress target) throws IOException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
ensureOpen();
|
||||
if (!isBlocking())
|
||||
throw new IllegalBlockingModeException();
|
||||
send(src, target);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
|
||||
throws IOException
|
||||
{
|
||||
|
@ -785,10 +942,7 @@ class DatagramChannelImpl
|
|||
try {
|
||||
writeLock.lock();
|
||||
try {
|
||||
synchronized (stateLock) {
|
||||
ensureOpen();
|
||||
IOUtil.configureBlocking(fd, block);
|
||||
}
|
||||
lockedConfigureBlocking(block);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
@ -797,6 +951,36 @@ class DatagramChannelImpl
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjusts the blocking mode. readLock or writeLock must already be held.
|
||||
*/
|
||||
private void lockedConfigureBlocking(boolean block) throws IOException {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
|
||||
synchronized (stateLock) {
|
||||
ensureOpen();
|
||||
IOUtil.configureBlocking(fd, block);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjusts the blocking mode if the channel is open. readLock or writeLock
|
||||
* must already be held.
|
||||
*
|
||||
* @return {@code true} if the blocking mode was adjusted, {@code false} if
|
||||
* the blocking mode was not adjusted because the channel is closed
|
||||
*/
|
||||
private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
|
||||
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
|
||||
synchronized (stateLock) {
|
||||
if (isOpen()) {
|
||||
IOUtil.configureBlocking(fd, block);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
InetSocketAddress localAddress() {
|
||||
synchronized (stateLock) {
|
||||
return localAddress;
|
||||
|
@ -861,6 +1045,16 @@ class DatagramChannelImpl
|
|||
|
||||
@Override
|
||||
public DatagramChannel connect(SocketAddress sa) throws IOException {
|
||||
return connect(sa, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects the channel's socket.
|
||||
*
|
||||
* @param sa the remote address to which this channel is to be connected
|
||||
* @param check true to check if the channel is already connected.
|
||||
*/
|
||||
DatagramChannel connect(SocketAddress sa, boolean check) throws IOException {
|
||||
InetSocketAddress isa = Net.checkAddress(sa, family);
|
||||
SecurityManager sm = System.getSecurityManager();
|
||||
if (sm != null) {
|
||||
|
@ -879,7 +1073,7 @@ class DatagramChannelImpl
|
|||
try {
|
||||
synchronized (stateLock) {
|
||||
ensureOpen();
|
||||
if (state == ST_CONNECTED)
|
||||
if (check && state == ST_CONNECTED)
|
||||
throw new AlreadyConnectedException();
|
||||
|
||||
// ensure that the socket is bound
|
||||
|
@ -908,7 +1102,7 @@ class DatagramChannelImpl
|
|||
}
|
||||
try {
|
||||
ByteBuffer buf = ByteBuffer.allocate(100);
|
||||
while (receive(fd, buf, false) > 0) {
|
||||
while (receive(buf, false) >= 0) {
|
||||
buf.clear();
|
||||
}
|
||||
} finally {
|
||||
|
@ -1331,30 +1525,6 @@ class DatagramChannelImpl
|
|||
return translateReadyOps(ops, 0, ski);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll this channel's socket for reading up to the given timeout.
|
||||
* @return {@code true} if the socket is polled
|
||||
*/
|
||||
boolean pollRead(long timeout) throws IOException {
|
||||
boolean blocking = isBlocking();
|
||||
assert Thread.holdsLock(blockingLock()) && blocking;
|
||||
|
||||
readLock.lock();
|
||||
try {
|
||||
boolean polled = false;
|
||||
try {
|
||||
beginRead(blocking, false);
|
||||
int events = Net.poll(fd, Net.POLLIN, timeout);
|
||||
polled = (events != 0);
|
||||
} finally {
|
||||
endRead(blocking, polled);
|
||||
}
|
||||
return polled;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Translates an interest operation set into a native poll event set
|
||||
*/
|
||||
|
|
|
@ -26,6 +26,9 @@
|
|||
package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.lang.invoke.MethodHandles.Lookup;
|
||||
import java.lang.invoke.VarHandle;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.DatagramSocketImpl;
|
||||
|
@ -35,15 +38,16 @@ import java.net.NetworkInterface;
|
|||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketOption;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AlreadyConnectedException;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.IllegalBlockingModeException;
|
||||
import java.util.Objects;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
// Make a datagram-socket channel look like a datagram socket.
|
||||
//
|
||||
|
@ -61,13 +65,9 @@ class DatagramSocketAdaptor
|
|||
// Timeout "option" value for receives
|
||||
private volatile int timeout;
|
||||
|
||||
// ## super will create a useless impl
|
||||
// create DatagramSocket with useless impl
|
||||
private DatagramSocketAdaptor(DatagramChannelImpl dc) {
|
||||
// Invoke the DatagramSocketAdaptor(SocketAddress) constructor,
|
||||
// passing a dummy DatagramSocketImpl object to avoid any native
|
||||
// resource allocation in super class and invoking our bind method
|
||||
// before the dc field is initialized.
|
||||
super(dummyDatagramSocket);
|
||||
super(new DummyDatagramSocketImpl());
|
||||
this.dc = dc;
|
||||
}
|
||||
|
||||
|
@ -75,17 +75,9 @@ class DatagramSocketAdaptor
|
|||
return new DatagramSocketAdaptor(dc);
|
||||
}
|
||||
|
||||
private void connectInternal(SocketAddress remote)
|
||||
throws SocketException
|
||||
{
|
||||
InetSocketAddress isa = Net.asInetSocketAddress(remote);
|
||||
int port = isa.getPort();
|
||||
if (port < 0 || port > 0xFFFF)
|
||||
throw new IllegalArgumentException("connect: " + port);
|
||||
if (remote == null)
|
||||
throw new IllegalArgumentException("connect: null address");
|
||||
private void connectInternal(SocketAddress remote) throws SocketException {
|
||||
try {
|
||||
dc.connect(remote);
|
||||
dc.connect(remote, false); // skips check for already connected
|
||||
} catch (ClosedChannelException e) {
|
||||
// ignore
|
||||
} catch (Exception x) {
|
||||
|
@ -95,9 +87,12 @@ class DatagramSocketAdaptor
|
|||
|
||||
@Override
|
||||
public void bind(SocketAddress local) throws SocketException {
|
||||
if (local != null) {
|
||||
local = Net.asInetSocketAddress(local);
|
||||
} else {
|
||||
local = new InetSocketAddress(0);
|
||||
}
|
||||
try {
|
||||
if (local == null)
|
||||
local = new InetSocketAddress(0);
|
||||
dc.bind(local);
|
||||
} catch (Exception x) {
|
||||
Net.translateToSocketException(x);
|
||||
|
@ -106,17 +101,20 @@ class DatagramSocketAdaptor
|
|||
|
||||
@Override
|
||||
public void connect(InetAddress address, int port) {
|
||||
if (address == null)
|
||||
throw new IllegalArgumentException("Address can't be null");
|
||||
try {
|
||||
connectInternal(new InetSocketAddress(address, port));
|
||||
} catch (SocketException x) {
|
||||
// Yes, j.n.DatagramSocket really does this
|
||||
throw new Error(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connect(SocketAddress remote) throws SocketException {
|
||||
Objects.requireNonNull(remote, "Address can't be null");
|
||||
connectInternal(remote);
|
||||
if (remote == null)
|
||||
throw new IllegalArgumentException("Address can't be null");
|
||||
connectInternal(Net.asInetSocketAddress(remote));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -157,80 +155,84 @@ class DatagramSocketAdaptor
|
|||
|
||||
@Override
|
||||
public SocketAddress getLocalSocketAddress() {
|
||||
return dc.localAddress();
|
||||
try {
|
||||
return dc.getLocalAddress();
|
||||
} catch (ClosedChannelException e) {
|
||||
return null;
|
||||
} catch (Exception x) {
|
||||
throw new Error(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(DatagramPacket p) throws IOException {
|
||||
synchronized (dc.blockingLock()) {
|
||||
if (!dc.isBlocking())
|
||||
throw new IllegalBlockingModeException();
|
||||
try {
|
||||
synchronized (p) {
|
||||
ByteBuffer bb = ByteBuffer.wrap(p.getData(),
|
||||
p.getOffset(),
|
||||
p.getLength());
|
||||
if (dc.isConnected()) {
|
||||
if (p.getAddress() == null) {
|
||||
// Legacy DatagramSocket will send in this case
|
||||
// and set address and port of the packet
|
||||
InetSocketAddress isa = dc.remoteAddress();
|
||||
p.setPort(isa.getPort());
|
||||
p.setAddress(isa.getAddress());
|
||||
dc.write(bb);
|
||||
} else {
|
||||
// Target address may not match connected address
|
||||
dc.send(bb, p.getSocketAddress());
|
||||
}
|
||||
} else {
|
||||
// Not connected so address must be valid or throw
|
||||
dc.send(bb, p.getSocketAddress());
|
||||
ByteBuffer bb = null;
|
||||
try {
|
||||
InetSocketAddress target;
|
||||
synchronized (p) {
|
||||
// copy bytes to temporary direct buffer
|
||||
int len = p.getLength();
|
||||
bb = Util.getTemporaryDirectBuffer(len);
|
||||
bb.put(p.getData(), p.getOffset(), len);
|
||||
bb.flip();
|
||||
|
||||
// target address
|
||||
if (p.getAddress() == null) {
|
||||
InetSocketAddress remote = dc.remoteAddress();
|
||||
if (remote == null) {
|
||||
// not specified by DatagramSocket
|
||||
throw new IllegalArgumentException("Address not set");
|
||||
}
|
||||
// set address/port to maintain compatibility with DatagramSocket
|
||||
p.setAddress(remote.getAddress());
|
||||
p.setPort(remote.getPort());
|
||||
target = remote;
|
||||
} else {
|
||||
// throws IllegalArgumentException if port not set
|
||||
target = (InetSocketAddress) p.getSocketAddress();
|
||||
}
|
||||
} catch (IOException x) {
|
||||
Net.translateException(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private SocketAddress receive(ByteBuffer bb) throws IOException {
|
||||
assert Thread.holdsLock(dc.blockingLock()) && dc.isBlocking();
|
||||
|
||||
long to = this.timeout;
|
||||
if (to == 0) {
|
||||
return dc.receive(bb);
|
||||
} else {
|
||||
for (;;) {
|
||||
if (!dc.isOpen())
|
||||
throw new ClosedChannelException();
|
||||
long st = System.currentTimeMillis();
|
||||
if (dc.pollRead(to)) {
|
||||
return dc.receive(bb);
|
||||
}
|
||||
to -= System.currentTimeMillis() - st;
|
||||
if (to <= 0)
|
||||
throw new SocketTimeoutException();
|
||||
// send datagram
|
||||
try {
|
||||
dc.blockingSend(bb, target);
|
||||
} catch (AlreadyConnectedException e) {
|
||||
throw new IllegalArgumentException("Connected and packet address differ");
|
||||
} catch (ClosedChannelException e) {
|
||||
var exc = new SocketException("Socket closed");
|
||||
exc.initCause(e);
|
||||
throw exc;
|
||||
}
|
||||
} finally {
|
||||
if (bb != null) {
|
||||
Util.offerFirstTemporaryDirectBuffer(bb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void receive(DatagramPacket p) throws IOException {
|
||||
synchronized (dc.blockingLock()) {
|
||||
if (!dc.isBlocking())
|
||||
throw new IllegalBlockingModeException();
|
||||
try {
|
||||
synchronized (p) {
|
||||
ByteBuffer bb = ByteBuffer.wrap(p.getData(),
|
||||
p.getOffset(),
|
||||
p.getLength());
|
||||
SocketAddress sender = receive(bb);
|
||||
p.setSocketAddress(sender);
|
||||
p.setLength(bb.position() - p.getOffset());
|
||||
}
|
||||
} catch (IOException x) {
|
||||
Net.translateException(x);
|
||||
// get temporary direct buffer with a capacity of p.bufLength
|
||||
int bufLength = DatagramPackets.getBufLength(p);
|
||||
ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength);
|
||||
try {
|
||||
long nanos = MILLISECONDS.toNanos(timeout);
|
||||
SocketAddress sender = dc.blockingReceive(bb, nanos);
|
||||
bb.flip();
|
||||
synchronized (p) {
|
||||
// copy bytes to the DatagramPacket and set length
|
||||
int len = Math.min(bb.limit(), DatagramPackets.getBufLength(p));
|
||||
bb.get(p.getData(), p.getOffset(), len);
|
||||
DatagramPackets.setLength(p, len);
|
||||
|
||||
// sender address
|
||||
p.setSocketAddress(sender);
|
||||
}
|
||||
} catch (ClosedChannelException e) {
|
||||
var exc = new SocketException("Socket closed");
|
||||
exc.initCause(e);
|
||||
throw exc;
|
||||
} finally {
|
||||
Util.offerFirstTemporaryDirectBuffer(bb);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -257,19 +259,16 @@ class DatagramSocketAdaptor
|
|||
public int getLocalPort() {
|
||||
if (isClosed())
|
||||
return -1;
|
||||
try {
|
||||
InetSocketAddress local = dc.localAddress();
|
||||
if (local != null) {
|
||||
return local.getPort();
|
||||
}
|
||||
} catch (Exception x) {
|
||||
InetSocketAddress local = dc.localAddress();
|
||||
if (local != null) {
|
||||
return local.getPort();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSoTimeout(int timeout) throws SocketException {
|
||||
if (!dc.isOpen())
|
||||
if (isClosed())
|
||||
throw new SocketException("Socket is closed");
|
||||
if (timeout < 0)
|
||||
throw new IllegalArgumentException("timeout < 0");
|
||||
|
@ -278,7 +277,7 @@ class DatagramSocketAdaptor
|
|||
|
||||
@Override
|
||||
public int getSoTimeout() throws SocketException {
|
||||
if (!dc.isOpen())
|
||||
if (isClosed())
|
||||
throw new SocketException("Socket is closed");
|
||||
return timeout;
|
||||
}
|
||||
|
@ -353,7 +352,6 @@ class DatagramSocketAdaptor
|
|||
@Override
|
||||
public boolean getReuseAddress() throws SocketException {
|
||||
return getBooleanOption(StandardSocketOptions.SO_REUSEADDR);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -411,50 +409,157 @@ class DatagramSocketAdaptor
|
|||
return dc.supportedOptions();
|
||||
}
|
||||
|
||||
/*
|
||||
* A dummy implementation of DatagramSocketImpl that can be passed to the
|
||||
* DatagramSocket constructor so that no native resources are allocated in
|
||||
* super class.
|
||||
*/
|
||||
private static final DatagramSocketImpl dummyDatagramSocket
|
||||
= new DatagramSocketImpl()
|
||||
{
|
||||
protected void create() throws SocketException {}
|
||||
|
||||
protected void bind(int lport, InetAddress laddr) throws SocketException {}
|
||||
/**
|
||||
* DatagramSocketImpl implementation where all methods throw an error.
|
||||
*/
|
||||
private static class DummyDatagramSocketImpl extends DatagramSocketImpl {
|
||||
private static <T> T shouldNotGetHere() {
|
||||
throw new InternalError("Should not get here");
|
||||
}
|
||||
|
||||
protected void send(DatagramPacket p) throws IOException {}
|
||||
@Override
|
||||
protected void create() {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected int peek(InetAddress i) throws IOException { return 0; }
|
||||
@Override
|
||||
protected void bind(int lport, InetAddress laddr) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected int peekData(DatagramPacket p) throws IOException { return 0; }
|
||||
@Override
|
||||
protected void send(DatagramPacket p) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void receive(DatagramPacket p) throws IOException {}
|
||||
@Override
|
||||
protected int peek(InetAddress address) {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
protected void setTTL(byte ttl) throws IOException {}
|
||||
@Override
|
||||
protected int peekData(DatagramPacket p) {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
protected byte getTTL() throws IOException { return 0; }
|
||||
@Override
|
||||
protected void receive(DatagramPacket p) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void setTimeToLive(int ttl) throws IOException {}
|
||||
@Deprecated
|
||||
protected void setTTL(byte ttl) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected int getTimeToLive() throws IOException { return 0;}
|
||||
@Deprecated
|
||||
protected byte getTTL() {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void join(InetAddress inetaddr) throws IOException {}
|
||||
@Override
|
||||
protected void setTimeToLive(int ttl) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void leave(InetAddress inetaddr) throws IOException {}
|
||||
@Override
|
||||
protected int getTimeToLive() {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void joinGroup(SocketAddress mcastaddr,
|
||||
NetworkInterface netIf) throws IOException {}
|
||||
@Override
|
||||
protected void join(InetAddress group) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void leaveGroup(SocketAddress mcastaddr,
|
||||
NetworkInterface netIf) throws IOException {}
|
||||
@Override
|
||||
protected void leave(InetAddress inetaddr) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
protected void close() {}
|
||||
@Override
|
||||
protected void joinGroup(SocketAddress group, NetworkInterface netIf) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
public Object getOption(int optID) throws SocketException { return null;}
|
||||
@Override
|
||||
protected void leaveGroup(SocketAddress mcastaddr, NetworkInterface netIf) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
public void setOption(int optID, Object value) throws SocketException {}
|
||||
};
|
||||
}
|
||||
@Override
|
||||
protected void close() {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getOption(int optID) {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setOption(int optID, Object value) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> void setOption(SocketOption<T> name, T value) {
|
||||
shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected <T> T getOption(SocketOption<T> name) {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Set<SocketOption<?>> supportedOptions() {
|
||||
return shouldNotGetHere();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines static methods to get/set DatagramPacket fields and workaround
|
||||
* DatagramPacket deficiencies.
|
||||
*/
|
||||
private static class DatagramPackets {
|
||||
private static final VarHandle LENGTH;
|
||||
private static final VarHandle BUF_LENGTH;
|
||||
static {
|
||||
try {
|
||||
PrivilegedAction<Lookup> pa = () -> {
|
||||
try {
|
||||
return MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup());
|
||||
} catch (Exception e) {
|
||||
throw new ExceptionInInitializerError(e);
|
||||
}
|
||||
};
|
||||
MethodHandles.Lookup l = AccessController.doPrivileged(pa);
|
||||
LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class);
|
||||
BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class);
|
||||
} catch (Exception e) {
|
||||
throw new ExceptionInInitializerError(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be
|
||||
* used at this time because it sets both the length and bufLength fields.
|
||||
*/
|
||||
static void setLength(DatagramPacket p, int value) {
|
||||
synchronized (p) {
|
||||
LENGTH.set(p, value);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of the DatagramPacket.bufLength field.
|
||||
*/
|
||||
static int getBufLength(DatagramPacket p) {
|
||||
synchronized (p) {
|
||||
return (int) BUF_LENGTH.get(p);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue