From 028068a655bb08e016e7a915c2b2f6abc1e480a0 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Wed, 19 Jul 2023 13:17:37 +0000 Subject: [PATCH] 8312166: (dc) DatagramChannel's socket adaptor does not release carrier thread when blocking in receive Reviewed-by: jpai, michaelm --- .../sun/nio/ch/DatagramChannelImpl.java | 274 ++++++++++++------ .../sun/nio/ch/DatagramSocketAdaptor.java | 128 ++------ .../net/DatagramSocket/TimeoutWithSM.java | 118 ++++++++ 3 files changed, 324 insertions(+), 196 deletions(-) create mode 100644 test/jdk/java/net/DatagramSocket/TimeoutWithSM.java diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java index 17e9c54fe94..af8f0b987e9 100644 --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java @@ -33,6 +33,7 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.lang.ref.Cleaner.Cleanable; import java.lang.reflect.Method; +import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.Inet4Address; import java.net.Inet6Address; @@ -650,114 +651,130 @@ class DatagramChannelImpl } /** - * Receives a datagram into the given buffer. + * Receives a datagram. * - * @apiNote This method is for use by the socket adaptor. The buffer is - * assumed to be trusted, meaning it is not accessible to user code. + * @apiNote This method is for use by the socket adaptor. * * @throws IllegalBlockingModeException if the channel is non-blocking * @throws SocketTimeoutException if the timeout elapses */ - SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException { + void blockingReceive(DatagramPacket p, long nanos) throws IOException { + Objects.requireNonNull(p); + assert nanos >= 0; + readLock.lock(); try { ensureOpen(); if (!isBlocking()) throw new IllegalBlockingModeException(); - @SuppressWarnings("removal") - SecurityManager sm = System.getSecurityManager(); - boolean connected = isConnected(); - SocketAddress sender; - do { - if (nanos > 0) { - sender = trustedBlockingReceive(dst, nanos); - } else { - sender = trustedBlockingReceive(dst); - } - // check sender when security manager set and not connected - if (sm != null && !connected) { - InetSocketAddress isa = (InetSocketAddress) sender; - try { - sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); - } catch (SecurityException e) { - sender = null; + + // underlying socket needs to be non-blocking if timed receive or virtual thread + if (nanos > 0) { + configureSocketNonBlocking(); + } else { + configureSocketNonBlockingIfVirtualThread(); + nanos = Long.MAX_VALUE; + } + + // p.bufLength is the maximum size of the datagram that can be received + int bufLength; + synchronized (p) { + bufLength = DatagramPackets.getBufLength(p); + } + + long startNanos = System.nanoTime(); + SocketAddress sender = null; + try { + SocketAddress remote = beginRead(true, false); + boolean connected = (remote != null); + do { + long remainingNanos = nanos - (System.nanoTime() - startNanos); + ByteBuffer dst = tryBlockingReceive(connected, bufLength, remainingNanos); + + // if datagram received then get sender and copy to DatagramPacket + if (dst != null) { + try { + // sender address is in socket address buffer + sender = sourceSocketAddress(); + + // check sender when security manager set and not connected + @SuppressWarnings("removal") + SecurityManager sm = System.getSecurityManager(); + if (sm != null && !connected) { + InetSocketAddress isa = (InetSocketAddress) sender; + try { + sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); + } catch (SecurityException e) { + sender = null; + } + } + + // copy bytes to the DatagramPacket, and set length and sender + if (sender != null) { + synchronized (p) { + // re-read p.bufLength in case DatagramPacket changed + int len = Math.min(dst.limit(), DatagramPackets.getBufLength(p)); + dst.get(p.getData(), p.getOffset(), len); + DatagramPackets.setLength(p, len); + p.setSocketAddress(sender); + } + } + } finally { + Util.offerFirstTemporaryDirectBuffer(dst); + } } - } - } while (sender == null); - return sender; + } while (sender == null && isOpen()); + } finally { + endRead(true, (sender != null)); + } } finally { readLock.unlock(); } } /** - * Receives a datagram into given buffer. This method is used to support - * the socket adaptor. The buffer is assumed to be trusted. + * Attempt to receive a datagram. + * + * @param connected if the channel's socket is connected + * @param len the maximum size of the datagram to receive + * @param nanos the timeout, should be Long.MAX_VALUE for untimed + * @return a direct buffer containing the datagram or null if channel is closed * @throws SocketTimeoutException if the timeout elapses */ - private SocketAddress trustedBlockingReceive(ByteBuffer dst) + private ByteBuffer tryBlockingReceive(boolean connected, int len, long nanos) throws IOException { - assert readLock.isHeldByCurrentThread() && isBlocking(); - SocketAddress sender = null; + long startNanos = System.nanoTime(); + ByteBuffer dst = Util.getTemporaryDirectBuffer(len); + int n = -1; try { - SocketAddress remote = beginRead(true, false); - configureSocketNonBlockingIfVirtualThread(); - boolean connected = (remote != null); - int n = receive(dst, connected); - while (IOStatus.okayToRetry(n) && isOpen()) { - park(Net.POLLIN); + n = receive(dst, connected); + while (n == IOStatus.UNAVAILABLE && isOpen()) { + // virtual thread needs to release temporary direct buffer before parking + if (Thread.currentThread().isVirtual()) { + Util.offerFirstTemporaryDirectBuffer(dst); + dst = null; + } + long remainingNanos = nanos - (System.nanoTime() - startNanos); + if (remainingNanos <= 0) { + throw new SocketTimeoutException("Receive timed out"); + } + park(Net.POLLIN, remainingNanos); + // virtual thread needs to re-allocate temporary direct buffer after parking + if (Thread.currentThread().isVirtual()) { + dst = Util.getTemporaryDirectBuffer(len); + } n = receive(dst, connected); } - if (n > 0 || (n == 0 && isOpen())) { - // sender address is in socket address buffer - sender = sourceSocketAddress(); - } - return sender; + dst.flip(); } finally { - endRead(true, (sender != null)); - } - } - - /** - * Receives a datagram into given buffer with a timeout. This method is - * used to support the socket adaptor. The buffer is assumed to be trusted. - * @throws SocketTimeoutException if the timeout elapses - */ - private SocketAddress trustedBlockingReceive(ByteBuffer dst, long nanos) - throws IOException - { - assert readLock.isHeldByCurrentThread() && isBlocking(); - SocketAddress sender = null; - try { - SocketAddress remote = beginRead(true, false); - boolean connected = (remote != null); - - // change socket to non-blocking - lockedConfigureBlocking(false); - try { - long startNanos = System.nanoTime(); - int n = receive(dst, connected); - while (n == IOStatus.UNAVAILABLE && isOpen()) { - long remainingNanos = nanos - (System.nanoTime() - startNanos); - if (remainingNanos <= 0) { - throw new SocketTimeoutException("Receive timed out"); - } - park(Net.POLLIN, remainingNanos); - n = receive(dst, connected); - } - if (n > 0 || (n == 0 && isOpen())) { - // sender address is in socket address buffer - sender = sourceSocketAddress(); - } - return sender; - } finally { - // restore socket to blocking mode (if channel is open) - tryLockedConfigureBlocking(true); + // release buffer if no datagram received + if (dst != null && (n < 0 || (n == 0 && !isOpen()))) { + Util.offerFirstTemporaryDirectBuffer(dst); + dst = null; } - } finally { - endRead(true, (sender != null)); } + return dst; } /** @@ -889,19 +906,54 @@ class DatagramChannelImpl } /** - * Sends a datagram from the bytes in given buffer. + * Sends a datagram. * * @apiNote This method is for use by the socket adaptor. * + * @throws IllegalArgumentException if not connected and target address not set * @throws IllegalBlockingModeException if the channel is non-blocking */ - void blockingSend(ByteBuffer src, SocketAddress target) throws IOException { + void blockingSend(DatagramPacket p) throws IOException { + Objects.requireNonNull(p); + writeLock.lock(); try { ensureOpen(); if (!isBlocking()) throw new IllegalBlockingModeException(); - send(src, target); + + ByteBuffer src = null; + try { + InetSocketAddress target; + synchronized (p) { + int len = p.getLength(); + src = Util.getTemporaryDirectBuffer(len); + + // copy bytes to temporary direct buffer + src.put(p.getData(), p.getOffset(), len); + src.flip(); + + // target address + if (p.getAddress() == null) { + InetSocketAddress remote = remoteAddress(); + if (remote == null) { + throw new IllegalArgumentException("Address not set"); + } + // set address/port to be compatible with long standing behavior + p.setAddress(remote.getAddress()); + p.setPort(remote.getPort()); + target = remote; + } else { + target = (InetSocketAddress) p.getSocketAddress(); + } + } + + // send the datagram (does not block) + send(src, target); + + } finally { + if (src != null) Util.offerFirstTemporaryDirectBuffer(src); + } } finally { writeLock.unlock(); } @@ -1198,12 +1250,12 @@ class DatagramChannelImpl } /** - * Ensures that the socket is configured non-blocking when on a virtual thread. + * Ensures that the socket is configured non-blocking. * @throws IOException if there is an I/O error changing the blocking mode */ - private void configureSocketNonBlockingIfVirtualThread() throws IOException { + private void configureSocketNonBlocking() throws IOException { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); - if (!forcedNonBlocking && Thread.currentThread().isVirtual()) { + if (!forcedNonBlocking) { synchronized (stateLock) { ensureOpen(); IOUtil.configureBlocking(fd, false); @@ -1212,6 +1264,16 @@ class DatagramChannelImpl } } + /** + * Ensures that the socket is configured non-blocking when on a virtual thread. + * @throws IOException if there is an I/O error changing the blocking mode + */ + private void configureSocketNonBlockingIfVirtualThread() throws IOException { + if (Thread.currentThread().isVirtual()) { + configureSocketNonBlocking(); + } + } + InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; @@ -1952,6 +2014,44 @@ class DatagramChannelImpl }; } + /** + * Defines static methods to get/set DatagramPacket fields and workaround + * DatagramPacket deficiencies. + */ + private static class DatagramPackets { + private static final VarHandle LENGTH; + private static final VarHandle BUF_LENGTH; + static { + try { + PrivilegedExceptionAction pa = () -> + MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup()); + @SuppressWarnings("removal") + MethodHandles.Lookup l = AccessController.doPrivileged(pa); + LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class); + BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + /** + * Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be + * used at this time because it sets both the length and bufLength fields. + */ + static void setLength(DatagramPacket p, int value) { + assert Thread.holdsLock(p); + LENGTH.set(p, value); + } + + /** + * Returns the value of the DatagramPacket.bufLength field. + */ + static int getBufLength(DatagramPacket p) { + assert Thread.holdsLock(p); + return (int) BUF_LENGTH.get(p); + } + } + // -- Native methods -- private static native void disconnect0(FileDescriptor fd, boolean isIPv6) diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java index d59a4dc4b92..8826cc3c188 100644 --- a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -32,7 +32,6 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.MethodType; -import java.lang.invoke.VarHandle; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; @@ -44,7 +43,6 @@ import java.net.SocketException; import java.net.SocketOption; import java.net.SocketTimeoutException; import java.net.StandardSocketOptions; -import java.nio.ByteBuffer; import java.nio.channels.AlreadyConnectedException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedByInterruptException; @@ -56,7 +54,6 @@ import java.security.PrivilegedExceptionAction; import java.util.Objects; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; -import jdk.internal.misc.Blocker; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -192,79 +189,30 @@ public class DatagramSocketAdaptor @Override public void send(DatagramPacket p) throws IOException { - synchronized (p) { - int len = p.getLength(); - ByteBuffer bb = Util.getTemporaryDirectBuffer(len); - try { - // copy bytes to temporary direct buffer - bb.put(p.getData(), p.getOffset(), len); - bb.flip(); - - // target address - InetSocketAddress target; - if (p.getAddress() == null) { - InetSocketAddress remote = dc.remoteAddress(); - if (remote == null) { - // not specified by DatagramSocket - throw new IllegalArgumentException("Address not set"); - } - // set address/port to maintain compatibility with DatagramSocket - p.setAddress(remote.getAddress()); - p.setPort(remote.getPort()); - target = remote; - } else { - target = (InetSocketAddress) p.getSocketAddress(); - } - - // send datagram - dc.blockingSend(bb, target); - } catch (AlreadyConnectedException e) { - throw new IllegalArgumentException("Connected and packet address differ"); - } catch (ClosedChannelException e) { - throw new SocketException("Socket closed", e); - } finally { - Util.offerFirstTemporaryDirectBuffer(bb); - } + try { + dc.blockingSend(p); + } catch (AlreadyConnectedException e) { + throw new IllegalArgumentException("Connected and packet address differ"); + } catch (ClosedChannelException e) { + throw new SocketException("Socket closed", e); } } @Override public void receive(DatagramPacket p) throws IOException { - synchronized (p) { - // get temporary direct buffer with a capacity of p.bufLength - int bufLength = DatagramPackets.getBufLength(p); - ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength); - try { - SocketAddress sender; - long comp = Blocker.begin(); - try { - sender = dc.blockingReceive(bb, MILLISECONDS.toNanos(timeout)); - } finally { - Blocker.end(comp); - } - bb.flip(); - - // copy bytes to the DatagramPacket and set length - int len = Math.min(bb.limit(), DatagramPackets.getBufLength(p)); - bb.get(p.getData(), p.getOffset(), len); - DatagramPackets.setLength(p, len); - - // sender address - p.setSocketAddress(sender); - } catch (SocketTimeoutException | ClosedByInterruptException e) { - throw e; - } catch (InterruptedIOException e) { - Thread thread = Thread.currentThread(); - if (thread.isVirtual() && thread.isInterrupted()) { - close(); - throw new SocketException("Closed by interrupt"); - } - throw e; - } catch (ClosedChannelException e) { - throw new SocketException("Socket closed", e); - } finally { - Util.offerFirstTemporaryDirectBuffer(bb); + try { + dc.blockingReceive(p, MILLISECONDS.toNanos(timeout)); + } catch (SocketTimeoutException | ClosedByInterruptException e) { + throw e; + } catch (InterruptedIOException e) { + Thread thread = Thread.currentThread(); + if (thread.isVirtual() && thread.isInterrupted()) { + close(); + throw new SocketException("Closed by interrupt"); } + throw e; + } catch (ClosedChannelException e) { + throw new SocketException("Socket closed", e); } } @@ -704,44 +652,6 @@ public class DatagramSocketAdaptor return new InetSocketAddress(0).getAddress(); } - /** - * Defines static methods to get/set DatagramPacket fields and workaround - * DatagramPacket deficiencies. - */ - private static class DatagramPackets { - private static final VarHandle LENGTH; - private static final VarHandle BUF_LENGTH; - static { - try { - PrivilegedExceptionAction pa = () -> - MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup()); - @SuppressWarnings("removal") - MethodHandles.Lookup l = AccessController.doPrivileged(pa); - LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class); - BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class); - } catch (Exception e) { - throw new ExceptionInInitializerError(e); - } - } - - /** - * Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be - * used at this time because it sets both the length and bufLength fields. - */ - static void setLength(DatagramPacket p, int value) { - assert Thread.holdsLock(p); - LENGTH.set(p, value); - } - - /** - * Returns the value of the DatagramPacket.bufLength field. - */ - static int getBufLength(DatagramPacket p) { - assert Thread.holdsLock(p); - return (int) BUF_LENGTH.get(p); - } - } - /** * Defines static methods to invoke non-public NetworkInterface methods. */ diff --git a/test/jdk/java/net/DatagramSocket/TimeoutWithSM.java b/test/jdk/java/net/DatagramSocket/TimeoutWithSM.java new file mode 100644 index 00000000000..c42fa151f70 --- /dev/null +++ b/test/jdk/java/net/DatagramSocket/TimeoutWithSM.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2023, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @summary Test a timed DatagramSocket.receive with a SecurityManager set + * @run main/othervm -Djava.security.manager=allow TimeoutWithSM + */ + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.security.Permission; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TimeoutWithSM { + + private static final int TIMEOUT = 10_000; + + public static void main(String[] args) throws Exception { + try (var socket = new DatagramSocket(null)) { + InetAddress lb = InetAddress.getLoopbackAddress(); + socket.bind(new InetSocketAddress(lb, 0)); + + // start sender to send datagrams to us + var done = new AtomicBoolean(); + startSender(socket.getLocalSocketAddress(), done); + + // set a SecurityManager that blocks datagrams from sender + System.setSecurityManager(new SecurityManager() { + @Override + public void checkPermission(Permission p) { + } + @Override + public void checkAccept(String host, int port) { + var isa = new InetSocketAddress(host, port); + System.out.println("checkAccept " + isa); + throw new SecurityException(); + } + }); + + // timed receive, should throw SocketTimeoutException + try { + socket.setSoTimeout(TIMEOUT); + try { + byte[] bytes = new byte[1024]; + DatagramPacket p = new DatagramPacket(bytes, bytes.length); + socket.receive(p); + throw new RuntimeException("Packet received, unexpected!!! " + + " sender=" + p.getSocketAddress() + ", len=" + p.getLength()); + } catch (SocketTimeoutException expected) { + System.out.println(expected + ", expected!!!"); + } + } finally { + done.set(true); + } + } + } + + /** + * Start a thread to send datagrams to the given target address at intervals of + * one second. The sender stops when done is set to true. + */ + static void startSender(SocketAddress target, AtomicBoolean done) throws Exception { + assert target instanceof InetSocketAddress isa && isa.getAddress().isLoopbackAddress(); + var sender = new DatagramSocket(null); + boolean started = false; + try { + InetAddress lb = InetAddress.getLoopbackAddress(); + sender.bind(new InetSocketAddress(lb, 0)); + Thread.ofPlatform().start(() -> { + try { + try (sender) { + byte[] bytes = "hello".getBytes("UTF-8"); + DatagramPacket p = new DatagramPacket(bytes, bytes.length); + p.setSocketAddress(target); + while (!done.get()) { + System.out.println("Send datagram to " + target + " ..."); + sender.send(p); + Thread.sleep(Duration.ofSeconds(1)); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + started = true; + } finally { + if (!started) { + sender.close(); + } + } + } +}