8284161: Implementation of Virtual Threads (Preview)

Co-authored-by: Ron Pressler <rpressler@openjdk.org>
Co-authored-by: Alan Bateman <alanb@openjdk.org>
Co-authored-by: Erik Österlund <eosterlund@openjdk.org>
Co-authored-by: Andrew Haley <aph@openjdk.org>
Co-authored-by: Rickard Bäckman <rbackman@openjdk.org>
Co-authored-by: Markus Grönlund <mgronlun@openjdk.org>
Co-authored-by: Leonid Mesnik <lmesnik@openjdk.org>
Co-authored-by: Serguei Spitsyn <sspitsyn@openjdk.org>
Co-authored-by: Chris Plummer <cjplummer@openjdk.org>
Co-authored-by: Coleen Phillimore <coleenp@openjdk.org>
Co-authored-by: Robbin Ehn <rehn@openjdk.org>
Co-authored-by: Stefan Karlsson <stefank@openjdk.org>
Co-authored-by: Thomas Schatzl <tschatzl@openjdk.org>
Co-authored-by: Sergey Kuksenko <skuksenko@openjdk.org>
Reviewed-by: lancea, eosterlund, rehn, sspitsyn, stefank, tschatzl, dfuchs, lmesnik, dcubed, kevinw, amenkov, dlong, mchung, psandoz, bpb, coleenp, smarks, egahlin, mseledtsov, coffeys, darcy
This commit is contained in:
Alan Bateman 2022-05-07 08:06:16 +00:00
parent 5212535a27
commit 9583e3657e
1133 changed files with 95935 additions and 8335 deletions

View file

@ -768,9 +768,12 @@ public class HttpClient extends NetworkClient {
closeServer();
cachedHttpClient = false;
if (!failedOnce && requests != null) {
Thread thread = Thread.currentThread();
boolean doNotRetry = thread.isVirtual() && thread.isInterrupted();
failedOnce = true;
if (getRequestMethod().equals("CONNECT")
|| streaming
|| doNotRetry
|| (httpuc.getRequestMethod().equals("POST")
&& !retryPostProp)) {
// do not retry the request

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,6 +27,7 @@ package sun.nio.ch;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
@ -66,6 +67,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@ -159,6 +161,13 @@ class DatagramChannelImpl
// set true/false when socket is already bound and SO_REUSEADDR is emulated
private boolean isReuseAddress;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
// blocking mode and the channel's socket is in non-blocking mode then
// operations that don't complete immediately will poll the socket and
// preserve the semantics of blocking operations.
private volatile boolean forcedNonBlocking;
// -- End of fields protected by stateLock
@ -470,6 +479,26 @@ class DatagramChannelImpl
return DefaultOptionsHolder.defaultOptions;
}
@Override
public void park(int event, long nanos) throws IOException {
Thread thread = Thread.currentThread();
if (thread.isVirtual()) {
Poller.poll(getFDVal(), event, nanos, this::isOpen);
// DatagramSocket throws when virtual thread interrupted
if (!interruptible && thread.isInterrupted()) {
throw new InterruptedIOException();
}
} else {
long millis;
if (nanos == 0) {
millis = -1;
} else {
millis = TimeUnit.NANOSECONDS.toMillis(nanos);
}
Net.poll(getFD(), event, millis);
}
}
/**
* Marks the beginning of a read operation that might block.
*
@ -535,6 +564,7 @@ class DatagramChannelImpl
SocketAddress sender = null;
try {
SocketAddress remote = beginRead(blocking, false);
configureSocketNonBlockingIfVirtualThread();
boolean connected = (remote != null);
@SuppressWarnings("removal")
SecurityManager sm = System.getSecurityManager();
@ -577,17 +607,12 @@ class DatagramChannelImpl
assert readLock.isHeldByCurrentThread()
&& sm != null && remoteAddress == null;
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
boolean blocking = isBlocking();
for (;;) {
int n = receive(bb, false);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLIN);
n = receive(bb, false);
}
}
boolean blocking = isBlocking();
for (;;) {
int n;
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
n = receive(bb, false);
if (n >= 0) {
// sender address is in socket address buffer
InetSocketAddress isa = sourceSocketAddress();
@ -598,14 +623,17 @@ class DatagramChannelImpl
return isa;
} catch (SecurityException se) {
// ignore datagram
bb.clear();
}
} else {
return null;
}
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
if (blocking && IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLIN);
} else {
return null;
}
} finally {
Util.releaseTemporaryDirectBuffer(bb);
}
}
@ -662,6 +690,7 @@ class DatagramChannelImpl
SocketAddress sender = null;
try {
SocketAddress remote = beginRead(true, false);
configureSocketNonBlockingIfVirtualThread();
boolean connected = (remote != null);
int n = receive(dst, connected);
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -719,6 +748,10 @@ class DatagramChannelImpl
}
}
/**
* Receives a datagram into the buffer.
* @param connected true if the channel is connected
*/
private int receive(ByteBuffer dst, boolean connected) throws IOException {
int pos = dst.position();
int lim = dst.limit();
@ -789,6 +822,7 @@ class DatagramChannelImpl
boolean completed = false;
try {
SocketAddress remote = beginWrite(blocking, false);
configureSocketNonBlockingIfVirtualThread();
if (remote != null) {
// connected
if (!target.equals(remote)) {
@ -937,6 +971,7 @@ class DatagramChannelImpl
int n = 0;
try {
beginRead(blocking, true);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, buf, -1, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -966,6 +1001,7 @@ class DatagramChannelImpl
long n = 0;
try {
beginRead(blocking, true);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, dsts, offset, length, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -1048,6 +1084,7 @@ class DatagramChannelImpl
int n = 0;
try {
beginWrite(blocking, true);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, buf, -1, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -1077,6 +1114,7 @@ class DatagramChannelImpl
long n = 0;
try {
beginWrite(blocking, true);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, srcs, offset, length, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -1116,21 +1154,21 @@ class DatagramChannelImpl
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, block);
// do nothing if virtual thread has forced the socket to be non-blocking
if (!forcedNonBlocking) {
IOUtil.configureBlocking(fd, block);
}
}
}
/**
* Adjusts the blocking mode if the channel is open. readLock or writeLock
* must already be held.
*
* @return {@code true} if the blocking mode was adjusted, {@code false} if
* the blocking mode was not adjusted because the channel is closed
* Attempts to adjust the blocking mode if the channel is open.
* @return {@code true} if the blocking mode was adjusted
*/
private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
synchronized (stateLock) {
if (isOpen()) {
if (!forcedNonBlocking && isOpen()) {
IOUtil.configureBlocking(fd, block);
return true;
} else {
@ -1139,6 +1177,21 @@ class DatagramChannelImpl
}
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
* @throws IOException if there is an I/O error changing the blocking mode
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true;
}
}
}
InetSocketAddress localAddress() {
synchronized (stateLock) {
return localAddress;
@ -1263,7 +1316,7 @@ class DatagramChannelImpl
// flush any packets already received.
boolean blocking = isBlocking();
if (blocking) {
IOUtil.configureBlocking(fd, false);
lockedConfigureBlocking(false);
}
try {
ByteBuffer buf = ByteBuffer.allocate(100);
@ -1272,7 +1325,7 @@ class DatagramChannelImpl
}
} finally {
if (blocking) {
IOUtil.configureBlocking(fd, true);
tryLockedConfigureBlocking(true);
}
}
}
@ -1375,7 +1428,7 @@ class DatagramChannelImpl
}
// copy the blocking mode
if (!isBlocking()) {
if (!isBlocking() || forcedNonBlocking) {
IOUtil.configureBlocking(newfd, false);
}
@ -1732,11 +1785,18 @@ class DatagramChannelImpl
long reader = readerThread;
long writer = writerThread;
if (reader != 0 || writer != 0) {
nd.preClose(fd);
if (reader != 0)
NativeThread.signal(reader);
if (writer != 0)
NativeThread.signal(writer);
if (NativeThread.isVirtualThread(reader)
|| NativeThread.isVirtualThread(writer)) {
Poller.stopPoll(fdVal);
}
if (NativeThread.isNativeThread(reader)
|| NativeThread.isNativeThread(writer)) {
nd.preClose(fd);
if (NativeThread.isNativeThread(reader))
NativeThread.signal(reader);
if (NativeThread.isNativeThread(writer))
NativeThread.signal(writer);
}
}
}
}

View file

@ -26,6 +26,7 @@
package sun.nio.ch;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
@ -36,15 +37,17 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AlreadyConnectedException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.MembershipKey;
import java.security.AccessController;
@ -53,6 +56,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.misc.Blocker;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -188,17 +192,16 @@ public class DatagramSocketAdaptor
@Override
public void send(DatagramPacket p) throws IOException {
ByteBuffer bb = null;
try {
InetSocketAddress target;
synchronized (p) {
synchronized (p) {
int len = p.getLength();
ByteBuffer bb = Util.getTemporaryDirectBuffer(len);
try {
// copy bytes to temporary direct buffer
int len = p.getLength();
bb = Util.getTemporaryDirectBuffer(len);
bb.put(p.getData(), p.getOffset(), len);
bb.flip();
// target address
InetSocketAddress target;
if (p.getAddress() == null) {
InetSocketAddress remote = dc.remoteAddress();
if (remote == null) {
@ -212,17 +215,14 @@ public class DatagramSocketAdaptor
} else {
target = (InetSocketAddress) p.getSocketAddress();
}
}
// send datagram
try {
// send datagram
dc.blockingSend(bb, target);
} catch (AlreadyConnectedException e) {
throw new IllegalArgumentException("Connected and packet address differ");
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
}
} finally {
if (bb != null) {
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
@ -230,14 +230,20 @@ public class DatagramSocketAdaptor
@Override
public void receive(DatagramPacket p) throws IOException {
// get temporary direct buffer with a capacity of p.bufLength
int bufLength = DatagramPackets.getBufLength(p);
ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength);
try {
long nanos = MILLISECONDS.toNanos(timeout);
SocketAddress sender = dc.blockingReceive(bb, nanos);
bb.flip();
synchronized (p) {
synchronized (p) {
// get temporary direct buffer with a capacity of p.bufLength
int bufLength = DatagramPackets.getBufLength(p);
ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength);
try {
SocketAddress sender;
long comp = Blocker.begin();
try {
sender = dc.blockingReceive(bb, MILLISECONDS.toNanos(timeout));
} finally {
Blocker.end(comp);
}
bb.flip();
// copy bytes to the DatagramPacket and set length
int len = Math.min(bb.limit(), DatagramPackets.getBufLength(p));
bb.get(p.getData(), p.getOffset(), len);
@ -245,11 +251,20 @@ public class DatagramSocketAdaptor
// sender address
p.setSocketAddress(sender);
} catch (SocketTimeoutException | ClosedByInterruptException e) {
throw e;
} catch (InterruptedIOException e) {
Thread thread = Thread.currentThread();
if (thread.isVirtual() && thread.isInterrupted()) {
close();
throw new SocketException("Closed by interrupt");
}
throw e;
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
} catch (ClosedChannelException e) {
throw new SocketException("Socket closed", e);
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
@ -698,7 +713,7 @@ public class DatagramSocketAdaptor
private static final VarHandle BUF_LENGTH;
static {
try {
PrivilegedExceptionAction<Lookup> pa = () ->
PrivilegedExceptionAction<MethodHandles.Lookup> pa = () ->
MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup());
@SuppressWarnings("removal")
MethodHandles.Lookup l = AccessController.doPrivileged(pa);
@ -714,18 +729,16 @@ public class DatagramSocketAdaptor
* used at this time because it sets both the length and bufLength fields.
*/
static void setLength(DatagramPacket p, int value) {
synchronized (p) {
LENGTH.set(p, value);
}
assert Thread.holdsLock(p);
LENGTH.set(p, value);
}
/**
* Returns the value of the DatagramPacket.bufLength field.
*/
static int getBufLength(DatagramPacket p) {
synchronized (p) {
return (int) BUF_LENGTH.get(p);
}
assert Thread.holdsLock(p);
return (int) BUF_LENGTH.get(p);
}
}
@ -774,23 +787,21 @@ public class DatagramSocketAdaptor
}
/**
* Provides access to the value of the private static DatagramSocket.NO_DELEGATE
* Provides access to non-public constants in DatagramSocket.
*/
private static class DatagramSockets {
private static final SocketAddress NO_DELEGATE;
static {
try {
PrivilegedExceptionAction<Lookup> pa = () ->
MethodHandles.privateLookupIn(DatagramSocket.class, MethodHandles.lookup());
MethodHandles.privateLookupIn(DatagramSocket.class, MethodHandles.lookup());
@SuppressWarnings("removal")
MethodHandles.Lookup l = AccessController.doPrivileged(pa);
NO_DELEGATE = (SocketAddress)
l.findStaticVarHandle(DatagramSocket.class, "NO_DELEGATE",
SocketAddress.class).get();
var handle = l.findStaticVarHandle(DatagramSocket.class, "NO_DELEGATE", SocketAddress.class);
NO_DELEGATE = (SocketAddress) handle.get();
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
}
}
}

View file

@ -46,6 +46,7 @@ import java.util.Objects;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.Blocker;
import jdk.internal.misc.ExtendedMapMode;
import jdk.internal.misc.Unsafe;
import jdk.internal.misc.VM;
@ -229,7 +230,12 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -258,8 +264,13 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
n = IOUtil.read(fd, dsts, offset, length,
direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.read(fd, dsts, offset, length, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -285,7 +296,13 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
n = IOUtil.write(fd, src, -1, direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.write(fd, src, -1, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -314,8 +331,12 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
n = IOUtil.write(fd, srcs, offset, length,
direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.write(fd, srcs, offset, length, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -340,8 +361,13 @@ public class FileChannelImpl
return 0;
boolean append = fdAccess.getAppend(fd);
do {
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
long comp = Blocker.begin();
try {
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
} finally {
Blocker.end(comp);
}
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(p);
} finally {
@ -365,7 +391,12 @@ public class FileChannelImpl
if (!isOpen())
return null;
do {
p = nd.seek(fd, newPosition);
long comp = Blocker.begin();
try {
p = nd.seek(fd, newPosition);
} finally {
Blocker.end(comp);
}
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {
@ -387,7 +418,12 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
s = nd.size(fd);
long comp = Blocker.begin();
try {
s = nd.size(fd);
} finally {
Blocker.end(comp);
}
} while ((s == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(s);
} finally {
@ -418,14 +454,24 @@ public class FileChannelImpl
// get current size
long size;
do {
size = nd.size(fd);
long comp = Blocker.begin();
try {
size = nd.size(fd);
} finally {
Blocker.end(comp);
}
} while ((size == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
// get current position
do {
p = nd.seek(fd, -1);
long comp = Blocker.begin();
try {
p = nd.seek(fd, -1);
} finally {
Blocker.end(comp);
}
} while ((p == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -434,7 +480,12 @@ public class FileChannelImpl
// truncate file if given size is less than the current size
if (newSize < size) {
do {
rv = nd.truncate(fd, newSize);
long comp = Blocker.begin();
try {
rv = nd.truncate(fd, newSize);
} finally {
Blocker.end(comp);
}
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -444,7 +495,12 @@ public class FileChannelImpl
if (p > newSize)
p = newSize;
do {
rp = nd.seek(fd, p);
long comp = Blocker.begin();
try {
rp = nd.seek(fd, p);
} finally {
Blocker.end(comp);
}
} while ((rp == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {
@ -465,7 +521,12 @@ public class FileChannelImpl
if (!isOpen())
return;
do {
rv = nd.force(fd, metaData);
long comp = Blocker.begin();
try {
rv = nd.force(fd, metaData);
} finally {
Blocker.end(comp);
}
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
@ -505,7 +566,12 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
n = transferTo0(fd, position, icount, targetFD);
long comp = Blocker.begin();
try {
n = transferTo0(fd, position, icount, targetFD);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNSUPPORTED_CASE) {
if (target instanceof SinkChannelImpl)
@ -843,7 +909,12 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
n = IOUtil.read(fd, dst, position, direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.read(fd, dst, position, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -882,7 +953,12 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
n = IOUtil.write(fd, src, position, direct, alignment, nd);
long comp = Blocker.begin();
try {
n = IOUtil.write(fd, src, position, direct, alignment, nd);
} finally {
Blocker.end(comp);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
@ -1090,7 +1166,12 @@ public class FileChannelImpl
synchronized (positionLock) {
long filesize;
do {
filesize = nd.size(fd);
long comp = Blocker.begin();
try {
filesize = nd.size(fd);
} finally {
Blocker.end(comp);
}
} while ((filesize == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -1102,7 +1183,12 @@ public class FileChannelImpl
}
int rv;
do {
rv = nd.truncate(fd, position + size);
long comp = Blocker.begin();
try {
rv = nd.truncate(fd, position + size);
} finally {
Blocker.end(comp);
}
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -1292,7 +1378,12 @@ public class FileChannelImpl
return null;
int n;
do {
n = nd.lock(fd, true, position, size, shared);
long comp = Blocker.begin();
try {
n = nd.lock(fd, true, position, size, shared);
} finally {
Blocker.end(comp);
}
} while ((n == FileDispatcher.INTERRUPTED) && isOpen());
if (isOpen()) {
if (n == FileDispatcher.RET_EX_LOCK) {

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -26,8 +26,10 @@
package sun.nio.ch;
import java.nio.ByteBuffer;
import jdk.internal.ref.CleanerFactory;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.ref.CleanerFactory;
/**
* Manipulates a native array of iovec structs on Solaris:
@ -44,6 +46,7 @@ import jdk.internal.ref.CleanerFactory;
*/
class IOVecWrapper {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
// Miscellaneous constants
private static final int BASE_OFFSET = 0;
@ -81,8 +84,7 @@ class IOVecWrapper {
}
// per thread IOVecWrapper
private static final ThreadLocal<IOVecWrapper> cached =
new ThreadLocal<IOVecWrapper>();
private static final ThreadLocal<IOVecWrapper> cached = new ThreadLocal<>();
private IOVecWrapper(int size) {
this.size = size;
@ -95,7 +97,7 @@ class IOVecWrapper {
}
static IOVecWrapper get(int size) {
IOVecWrapper wrapper = cached.get();
IOVecWrapper wrapper = JLA.getCarrierThreadLocal(cached);
if (wrapper != null && wrapper.size < size) {
// not big enough; eagerly release memory
wrapper.vecArray.free();
@ -104,7 +106,7 @@ class IOVecWrapper {
if (wrapper == null) {
wrapper = new IOVecWrapper(size);
CleanerFactory.cleaner().register(wrapper, new Deallocator(wrapper.vecArray));
cached.set(wrapper);
JLA.setCarrierThreadLocal(cached, wrapper);
}
return wrapper;
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2002, 2012, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2002, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,7 +25,6 @@
package sun.nio.ch;
// Special-purpose data structure for sets of native threads
@ -39,11 +38,12 @@ class NativeThreadSet {
elts = new long[n];
}
// Adds the current native thread to this set, returning its index so that
// it can efficiently be removed later.
//
/**
* Adds the current native thread to this set, returning its index so that
* it can efficiently be removed later.
*/
int add() {
long th = NativeThread.current();
long th = NativeThread.currentNativeThread();
// 0 and -1 are treated as placeholders, not real thread handles
if (th == 0)
th = -1;
@ -67,12 +67,15 @@ class NativeThreadSet {
assert false;
return -1;
}
}
// Removes the thread at the given index.
//
/**
* Removes the thread at the give index.
*/
void remove(int i) {
synchronized (this) {
assert (elts[i] == NativeThread.currentNativeThread()) || (elts[i] == -1);
elts[i] = 0;
used--;
if (used == 0 && waitingToEmpty)

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,6 +28,7 @@ package sun.nio.ch;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.ref.Cleaner.Cleanable;
@ -50,6 +51,8 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.ref.CleanerFactory;
import sun.net.ConnectionResetException;
import sun.net.NetHooks;
@ -64,13 +67,11 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* NIO based SocketImpl.
*
* The underlying socket used by this SocketImpl is initially configured
* blocking. If the connect method is used to establish a connection with a
* timeout then the socket is configured non-blocking for the connect attempt,
* and then restored to blocking mode when the connection is established.
* If the accept or read methods are used with a timeout then the socket is
* configured non-blocking and is never restored. When in non-blocking mode,
* operations that don't complete immediately will poll the socket and preserve
* The underlying socket used by this SocketImpl is initially configured blocking.
* If a connect, accept or read is attempted with a timeout, or a virtual
* thread invokes a blocking operation, then the socket is changed to non-blocking
* When in non-blocking mode, operations that don't complete immediately will
* poll the socket (or park when invoked on a virtual thread) and preserve
* the semantics of blocking operations.
*/
@ -163,19 +164,27 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
}
/**
* Disables the current thread for scheduling purposes until the socket is
* ready for I/O, or is asynchronously closed, for up to the specified
* waiting time.
* Disables the current thread for scheduling purposes until the
* socket is ready for I/O or is asynchronously closed, for up to the
* specified waiting time.
* @throws IOException if an I/O error occurs
*/
private void park(FileDescriptor fd, int event, long nanos) throws IOException {
long millis;
if (nanos == 0) {
millis = -1;
Thread t = Thread.currentThread();
if (t.isVirtual()) {
Poller.poll(fdVal(fd), event, nanos, this::isOpen);
if (t.isInterrupted()) {
throw new InterruptedIOException();
}
} else {
millis = NANOSECONDS.toMillis(nanos);
long millis;
if (nanos == 0) {
millis = -1;
} else {
millis = NANOSECONDS.toMillis(nanos);
}
Net.poll(fd, event, millis);
}
Net.poll(fd, event, millis);
}
/**
@ -188,34 +197,18 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
}
/**
* Configures the socket to blocking mode. This method is a no-op if the
* socket is already in blocking mode.
* @throws IOException if closed or there is an I/O error changing the mode
* Ensures that the socket is configured non-blocking invoked on a virtual
* thread or the operation has a timeout
* @throws IOException if there is an I/O error changing the blocking mode
*/
private void configureBlocking(FileDescriptor fd) throws IOException {
assert readLock.isHeldByCurrentThread();
if (nonBlocking) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, true);
nonBlocking = false;
}
}
}
/**
* Configures the socket to non-blocking mode. This method is a no-op if the
* socket is already in non-blocking mode.
* @throws IOException if closed or there is an I/O error changing the mode
*/
private void configureNonBlocking(FileDescriptor fd) throws IOException {
assert readLock.isHeldByCurrentThread();
if (!nonBlocking) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
nonBlocking = true;
}
private void configureNonBlockingIfNeeded(FileDescriptor fd, boolean timed)
throws IOException
{
if (!nonBlocking
&& (timed || Thread.currentThread().isVirtual())) {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
IOUtil.configureBlocking(fd, false);
nonBlocking = true;
}
}
@ -300,9 +293,9 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
if (isInputClosed)
return -1;
int timeout = this.timeout;
configureNonBlockingIfNeeded(fd, timeout > 0);
if (timeout > 0) {
// read with timeout
configureNonBlocking(fd);
n = timedRead(fd, b, off, len, MILLISECONDS.toNanos(timeout));
} else {
// read, no timeout
@ -313,7 +306,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
}
}
return n;
} catch (SocketTimeoutException e) {
} catch (InterruptedIOException e) {
throw e;
} catch (ConnectionResetException e) {
connectionReset = true;
@ -407,12 +400,15 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
int n = 0;
FileDescriptor fd = beginWrite();
try {
configureNonBlockingIfNeeded(fd, false);
n = tryWrite(fd, b, off, len);
while (IOStatus.okayToRetry(n) && isOpen()) {
park(fd, Net.POLLOUT);
n = tryWrite(fd, b, off, len);
}
return n;
} catch (InterruptedIOException e) {
throw e;
} catch (IOException ioe) {
throw new SocketException(ioe.getMessage());
} finally {
@ -576,12 +572,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
boolean connected = false;
FileDescriptor fd = beginConnect(address, port);
try {
// configure socket to non-blocking mode when there is a timeout
if (millis > 0) {
configureNonBlocking(fd);
}
configureNonBlockingIfNeeded(fd, millis > 0);
int n = Net.connect(fd, address, port);
if (n > 0) {
// connection established
@ -602,12 +593,6 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
connected = polled && isOpen();
}
}
// restore socket to blocking mode
if (connected && millis > 0) {
configureBlocking(fd);
}
} finally {
endConnect(fd, connected);
}
@ -616,7 +601,11 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
}
} catch (IOException ioe) {
close();
throw SocketExceptions.of(ioe, isa);
if (ioe instanceof InterruptedIOException) {
throw ioe;
} else {
throw SocketExceptions.of(ioe, isa);
}
}
}
@ -743,9 +732,9 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
int n = 0;
FileDescriptor fd = beginAccept();
try {
configureNonBlockingIfNeeded(fd, remainingNanos > 0);
if (remainingNanos > 0) {
// accept with timeout
configureNonBlocking(fd);
n = timedAccept(fd, newfd, isaa, remainingNanos);
} else {
// accept, no timeout
@ -887,27 +876,37 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
this.state = ST_CLOSED;
return;
}
boolean connected = (state == ST_CONNECTED);
this.state = ST_CLOSING;
// shutdown output when linger interval not set to 0
try {
var SO_LINGER = StandardSocketOptions.SO_LINGER;
if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
Net.shutdown(fd, Net.SHUT_WR);
}
} catch (IOException ignore) { }
if (connected) {
try {
var SO_LINGER = StandardSocketOptions.SO_LINGER;
if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
Net.shutdown(fd, Net.SHUT_WR);
}
} catch (IOException ignore) { }
}
// attempt to close the socket. If there are I/O operations in progress
// then the socket is pre-closed and the thread(s) signalled. The
// last thread will close the file descriptor.
if (!tryClose()) {
nd.preClose(fd);
long reader = readerThread;
if (reader != 0)
NativeThread.signal(reader);
long writer = writerThread;
if (writer != 0)
NativeThread.signal(writer);
if (NativeThread.isVirtualThread(reader)
|| NativeThread.isVirtualThread(writer)) {
Poller.stopPoll(fdVal(fd));
}
if (NativeThread.isNativeThread(reader)
|| NativeThread.isNativeThread(writer)) {
nd.preClose(fd);
if (NativeThread.isNativeThread(reader))
NativeThread.signal(reader);
if (NativeThread.isNativeThread(writer))
NativeThread.signal(writer);
}
}
}
}
@ -1148,6 +1147,9 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
ensureOpenAndConnected();
if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
if (NativeThread.isVirtualThread(readerThread)) {
Poller.stopPoll(fdVal(fd), Net.POLLIN);
}
isInputClosed = true;
}
}
@ -1159,6 +1161,9 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
ensureOpenAndConnected();
if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
if (NativeThread.isVirtualThread(writerThread)) {
Poller.stopPoll(fdVal(fd), Net.POLLOUT);
}
isOutputClosed = true;
}
}
@ -1176,6 +1181,7 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
int n = 0;
FileDescriptor fd = beginWrite();
try {
configureNonBlockingIfNeeded(fd, false);
do {
n = Net.sendOOB(fd, (byte) data);
} while (n == IOStatus.INTERRUPTED && isOpen());
@ -1253,4 +1259,13 @@ public final class NioSocketImpl extends SocketImpl implements PlatformSocketImp
return StandardProtocolFamily.INET;
}
}
/**
* Return the file descriptor value.
*/
private static int fdVal(FileDescriptor fd) {
return JIOFDA.get(fd);
}
private static final JavaIOFileDescriptorAccess JIOFDA = SharedSecrets.getJavaIOFileDescriptorAccess();
}

View file

@ -0,0 +1,469 @@
/*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.io.IOError;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import jdk.internal.misc.InnocuousThread;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import sun.security.action.GetPropertyAction;
/**
* Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O.
*/
public abstract class Poller {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
private static final Poller[] READ_POLLERS;
private static final Poller[] WRITE_POLLERS;
private static final int READ_MASK, WRITE_MASK;
private static final boolean USE_DIRECT_REGISTER;
// true if this is a poller for reading, false for writing
private final boolean read;
// maps file descriptors to parked Thread
private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
// the queue of updates to the updater Thread
private final BlockingQueue<Request> queue = new LinkedTransferQueue<>();
/**
* Initialize a Poller for reading or writing.
*/
protected Poller(boolean read) {
this.read = read;
}
/**
* Returns true if this poller is for read (POLLIN) events.
*/
final boolean reading() {
return read;
}
/**
* Parks the current thread until a file descriptor is ready for the given op.
* @param fdVal the file descriptor
* @param event POLLIN or POLLOUT
* @param nanos the waiting time or 0 to wait indefinitely
* @param supplier supplies a boolean to indicate if the enclosing object is open
*/
public static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
throws IOException
{
assert nanos >= 0L;
if (event == Net.POLLIN) {
readPoller(fdVal).poll(fdVal, nanos, supplier);
} else if (event == Net.POLLOUT) {
writePoller(fdVal).poll(fdVal, nanos, supplier);
} else {
assert false;
}
}
/**
* Parks the current thread until a file descriptor is ready.
*/
private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
if (USE_DIRECT_REGISTER) {
poll1(fdVal, nanos, supplier);
} else {
poll2(fdVal, nanos, supplier);
}
}
/**
* Parks the current thread until a file descriptor is ready. This implementation
* registers the file descriptor, then parks until the file descriptor is polled.
*/
private void poll1(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
register(fdVal);
try {
boolean isOpen = supplier.getAsBoolean();
if (isOpen) {
if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
}
} finally {
deregister(fdVal);
}
}
/**
* Parks the current thread until a file descriptor is ready. This implementation
* queues the file descriptor to the update thread, then parks until the file
* descriptor is polled.
*/
private void poll2(int fdVal, long nanos, BooleanSupplier supplier) {
Request request = registerAsync(fdVal);
try {
boolean isOpen = supplier.getAsBoolean();
if (isOpen) {
if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
}
} finally {
request.awaitFinish();
deregister(fdVal);
}
}
/**
* Registers the file descriptor.
*/
private void register(int fdVal) throws IOException {
Thread previous = map.putIfAbsent(fdVal, Thread.currentThread());
assert previous == null;
implRegister(fdVal);
}
/**
* Queues the file descriptor to be registered by the updater thread, returning
* a Request object to track the request.
*/
private Request registerAsync(int fdVal) {
Thread previous = map.putIfAbsent(fdVal, Thread.currentThread());
assert previous == null;
Request request = new Request(fdVal);
queue.add(request);
return request;
}
/**
* Deregister the file descriptor, a no-op if already polled.
*/
private void deregister(int fdVal) {
Thread previous = map.remove(fdVal);
assert previous == null || previous == Thread.currentThread();
if (previous != null) {
implDeregister(fdVal);
}
}
/**
* A registration request queued to the updater thread.
*/
private static class Request {
private final int fdVal;
private volatile boolean done;
private volatile Thread waiter;
Request(int fdVal) {
this.fdVal = fdVal;
}
private int fdVal() {
return fdVal;
}
/**
* Invoked by the updater when the request has been processed.
*/
void finish() {
done = true;
Thread waiter = this.waiter;
if (waiter != null) {
LockSupport.unpark(waiter);
}
}
/**
* Waits for a request to be processed.
*/
void awaitFinish() {
if (!done) {
waiter = Thread.currentThread();
boolean interrupted = false;
while (!done) {
LockSupport.park();
if (Thread.interrupted()) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Register the file descriptor.
*/
abstract void implRegister(int fdVal) throws IOException;
/**
* Deregister the file descriptor.
*/
abstract void implDeregister(int fdVal);
/**
* Starts the poller threads.
*/
private Poller start() {
String prefix = (read) ? "Read" : "Write";
startThread(prefix + "-Poller", this::pollLoop);
if (!USE_DIRECT_REGISTER) {
startThread(prefix + "-Updater", this::updateLoop);
}
return this;
}
/**
* Starts a platform thread to run the given task.
*/
private void startThread(String name, Runnable task) {
try {
Thread thread = JLA.executeOnCarrierThread(() ->
InnocuousThread.newSystemThread(name, task)
);
thread.setDaemon(true);
thread.start();
} catch (Exception e) {
throw new InternalError(e);
}
}
/**
* Polling loop.
*/
private void pollLoop() {
try {
for (;;) {
poll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* The update loop to handle updates to the interest set.
*/
private void updateLoop() {
try {
for (;;) {
Request req = null;
while (req == null) {
try {
req = queue.take();
} catch (InterruptedException ignore) { }
}
implRegister(req.fdVal());
req.finish();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Maps the file descriptor value to a read poller.
*/
private static Poller readPoller(int fdVal) {
return READ_POLLERS[fdVal & READ_MASK];
}
/**
* Maps the file descriptor value to a write poller.
*/
private static Poller writePoller(int fdVal) {
return WRITE_POLLERS[fdVal & WRITE_MASK];
}
/**
* Unparks any thread that is polling the given file descriptor for the
* given event.
*/
static void stopPoll(int fdVal, int event) {
if (event == Net.POLLIN) {
readPoller(fdVal).wakeup(fdVal);
} else if (event == Net.POLLOUT) {
writePoller(fdVal).wakeup(fdVal);
} else {
throw new IllegalArgumentException();
}
}
/**
* Unparks any threads that are polling the given file descriptor.
*/
static void stopPoll(int fdVal) {
stopPoll(fdVal, Net.POLLIN);
stopPoll(fdVal, Net.POLLOUT);
}
/**
* Unparks any thread that is polling the given file descriptor.
*/
private void wakeup(int fdVal) {
Thread t = map.remove(fdVal);
if (t != null) {
LockSupport.unpark(t);
}
}
/**
* Called by the polling facility when the file descriptor is polled
*/
final void polled(int fdVal) {
wakeup(fdVal);
}
/**
* Poll for events. The {@link #polled(int)} method is invoked for each
* polled file descriptor.
*
* @param timeout if positive then block for up to {@code timeout} milliseconds,
* if zero then don't block, if -1 then block indefinitely
*/
abstract int poll(int timeout) throws IOException;
/**
* Poll for events, blocks indefinitely.
*/
final int poll() throws IOException {
return poll(-1);
}
/**
* Poll for events, non-blocking.
*/
final int pollNow() throws IOException {
return poll(0);
}
/**
* Returns the poller's file descriptor, or -1 if none.
*/
int fdVal() {
return -1;
}
/**
* Creates the read and writer pollers.
*/
static {
PollerProvider provider = PollerProvider.provider();
String s = GetPropertyAction.privilegedGetProperty("jdk.useDirectRegister");
if (s == null) {
USE_DIRECT_REGISTER = provider.useDirectRegister();
} else {
USE_DIRECT_REGISTER = "".equals(s) || Boolean.parseBoolean(s);
}
try {
Poller[] readPollers = createReadPollers(provider);
READ_POLLERS = readPollers;
READ_MASK = readPollers.length - 1;
Poller[] writePollers = createWritePollers(provider);
WRITE_POLLERS = writePollers;
WRITE_MASK = writePollers.length - 1;
} catch (IOException ioe) {
throw new IOError(ioe);
}
}
/**
* Create the read poller(s).
*/
private static Poller[] createReadPollers(PollerProvider provider) throws IOException {
int readPollerCount = pollerCount("jdk.readPollers");
Poller[] readPollers = new Poller[readPollerCount];
for (int i = 0; i< readPollerCount; i++) {
var poller = provider.readPoller();
readPollers[i] = poller.start();
}
return readPollers;
}
/**
* Create the write poller(s).
*/
private static Poller[] createWritePollers(PollerProvider provider) throws IOException {
int writePollerCount = pollerCount("jdk.writePollers");
Poller[] writePollers = new Poller[writePollerCount];
for (int i = 0; i< writePollerCount; i++) {
var poller = provider.writePoller();
writePollers[i] = poller.start();
}
return writePollers;
}
/**
* Reads the given property name to get the poller count. If the property is
* set then the value must be a power of 2. Returns 1 if the property is not
* set.
* @throws IllegalArgumentException if the property is set to a value that
* is not a power of 2.
*/
private static int pollerCount(String propName) {
String s = GetPropertyAction.privilegedGetProperty(propName, "1");
int count = Integer.parseInt(s);
// check power of 2
if (count != (1 << log2(count))) {
String msg = propName + " is set to a vale that is not a power of 2";
throw new IllegalArgumentException(msg);
}
return count;
}
private static int log2(int n) {
return 31 - Integer.numberOfLeadingZeros(n);
}
/**
* Return a stream of all threads blocked waiting for I/O operations.
*/
public static Stream<Thread> blockedThreads() {
Stream<Thread> s = Stream.empty();
for (int i = 0; i < READ_POLLERS.length; i++) {
s = Stream.concat(s, READ_POLLERS[i].registeredThreads());
}
for (int i = 0; i < WRITE_POLLERS.length; i++) {
s = Stream.concat(s, WRITE_POLLERS[i].registeredThreads());
}
return s;
}
private Stream<Thread> registeredThreads() {
return map.values().stream();
}
}

View file

@ -0,0 +1,70 @@
/*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package sun.nio.ch;
import java.io.IOException;
import java.util.ServiceConfigurationError;
import sun.security.action.GetPropertyAction;
abstract class PollerProvider {
PollerProvider() { }
/**
* Returns true if threads should register file descriptors directly,
* false to queue registrations to an updater thread.
*
* The default implementation returns false.
*/
boolean useDirectRegister() {
return false;
}
/**
* Creates a Poller for read ops.
*/
abstract Poller readPoller() throws IOException;
/**
* Creates a Poller for write ops.
*/
abstract Poller writePoller() throws IOException;
/**
* Creates the PollerProvider.
*/
static PollerProvider provider() {
String cn = GetPropertyAction.privilegedGetProperty("jdk.PollerProvider");
if (cn != null) {
try {
Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
return (PollerProvider) clazz.getConstructor().newInstance();
} catch (Exception e) {
throw new ServiceConfigurationError(null, e);
}
} else {
return new DefaultPollerProvider();
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,7 +28,6 @@ package sun.nio.ch;
import java.nio.channels.Channel;
import java.io.FileDescriptor;
import java.io.IOException;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
@ -83,13 +82,17 @@ public interface SelChImpl extends Channel {
* @param nanos the timeout to wait; {@code <= 0} to wait indefinitely
*/
default void park(int event, long nanos) throws IOException {
long millis;
if (nanos <= 0) {
millis = -1;
if (Thread.currentThread().isVirtual()) {
Poller.poll(getFDVal(), event, nanos, this::isOpen);
} else {
millis = NANOSECONDS.toMillis(nanos);
long millis;
if (nanos <= 0) {
millis = -1;
} else {
millis = NANOSECONDS.toMillis(nanos);
}
Net.poll(getFD(), event, millis);
}
Net.poll(getFD(), event, millis);
}
/**
@ -105,5 +108,4 @@ public interface SelChImpl extends Channel {
default void park(int event) throws IOException {
park(event, 0L);
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -103,6 +103,13 @@ class ServerSocketChannelImpl
// Our socket adaptor, if any
private ServerSocket socket;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
// blocking mode and the channel's socket is in non-blocking mode then
// operations that don't complete immediately will poll the socket and
// preserve the semantics of blocking operations.
private volatile boolean forcedNonBlocking;
// -- End of fields protected by stateLock
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
@ -388,6 +395,7 @@ class ServerSocketChannelImpl
boolean blocking = isBlocking();
try {
begin(blocking);
configureSocketNonBlockingIfVirtualThread();
n = implAccept(this.fd, newfd, saa);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -514,27 +522,28 @@ class ServerSocketChannelImpl
}
/**
* Adjust the blocking. acceptLock must already be held.
* Adjusts the blocking mode.
*/
private void lockedConfigureBlocking(boolean block) throws IOException {
assert acceptLock.isHeldByCurrentThread();
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, block);
// do nothing if virtual thread has forced the socket to be non-blocking
if (!forcedNonBlocking) {
IOUtil.configureBlocking(fd, block);
}
}
}
/**
* Adjusts the blocking mode if the channel is open. acceptLock must already
* be held.
*
* @return {@code true} if the blocking mode was adjusted, {@code false} if
* the blocking mode was not adjusted because the channel is closed
* Attempts to adjust the blocking mode if the channel is open.
* @return {@code true} if the blocking mode was adjusted
*/
private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
assert acceptLock.isHeldByCurrentThread();
synchronized (stateLock) {
if (isOpen()) {
// do nothing if virtual thread has forced the socket to be non-blocking
if (!forcedNonBlocking && isOpen()) {
IOUtil.configureBlocking(fd, block);
return true;
} else {
@ -543,6 +552,20 @@ class ServerSocketChannelImpl
}
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
assert acceptLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true;
}
}
}
/**
* Closes the socket if there are no accept in progress and the channel is
* not registered with a Selector.
@ -583,8 +606,12 @@ class ServerSocketChannelImpl
if (!tryClose()) {
long th = thread;
if (th != 0) {
nd.preClose(fd);
NativeThread.signal(th);
if (NativeThread.isVirtualThread(th)) {
Poller.stopPoll(fdVal);
} else {
nd.preClose(fd);
NativeThread.signal(th);
}
}
}
}

View file

@ -65,11 +65,15 @@ class SocketAdaptor
@SuppressWarnings("removal")
static Socket create(SocketChannelImpl sc) {
PrivilegedExceptionAction<Socket> pa = () -> new SocketAdaptor(sc);
try {
return AccessController.doPrivileged(pa);
} catch (PrivilegedActionException pae) {
throw new InternalError("Should not reach here", pae);
if (System.getSecurityManager() == null) {
return new SocketAdaptor(sc);
} else {
PrivilegedExceptionAction<Socket> pa = () -> new SocketAdaptor(sc);
return AccessController.doPrivileged(pa);
}
} catch (SocketException | PrivilegedActionException e) {
throw new InternalError(e);
}
}

View file

@ -123,6 +123,13 @@ class SocketChannelImpl
// Socket adaptor, created on demand
private Socket socket;
// True if the channel's socket has been forced into non-blocking mode
// by a virtual thread. It cannot be reset. When the channel is in
// blocking mode and the channel's socket is in non-blocking mode then
// operations that don't complete immediately will poll the socket and
// preserve the semantics of blocking operations.
private volatile boolean forcedNonBlocking;
// -- End of fields protected by stateLock
SocketChannelImpl(SelectorProvider sp) throws IOException {
@ -414,6 +421,7 @@ class SocketChannelImpl
if (isInputClosed)
return IOStatus.EOF;
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, buf, -1, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -457,6 +465,7 @@ class SocketChannelImpl
if (isInputClosed)
return IOStatus.EOF;
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.read(fd, dsts, offset, length, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -529,6 +538,7 @@ class SocketChannelImpl
int n = 0;
try {
beginWrite(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, buf, -1, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -560,6 +570,7 @@ class SocketChannelImpl
long n = 0;
try {
beginWrite(blocking);
configureSocketNonBlockingIfVirtualThread();
n = IOUtil.write(fd, srcs, offset, length, nd);
if (blocking) {
while (IOStatus.okayToRetry(n) && isOpen()) {
@ -589,12 +600,12 @@ class SocketChannelImpl
int n = 0;
try {
beginWrite(blocking);
if (blocking) {
do {
n = Net.sendOOB(fd, b);
} while (n == IOStatus.INTERRUPTED && isOpen());
} else {
configureSocketNonBlockingIfVirtualThread();
do {
n = Net.sendOOB(fd, b);
} while (n == IOStatus.INTERRUPTED && isOpen());
if (blocking && n == IOStatus.UNAVAILABLE) {
throw new SocketException("No buffer space available");
}
} finally {
endWrite(blocking, n > 0);
@ -623,27 +634,28 @@ class SocketChannelImpl
}
/**
* Adjusts the blocking mode. readLock or writeLock must already be held.
* Adjust the blocking mode while holding readLock or writeLock.
*/
private void lockedConfigureBlocking(boolean block) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, block);
// do nothing if virtual thread has forced the socket to be non-blocking
if (!forcedNonBlocking) {
IOUtil.configureBlocking(fd, block);
}
}
}
/**
* Adjusts the blocking mode if the channel is open. readLock or writeLock
* must already be held.
*
* @return {@code true} if the blocking mode was adjusted, {@code false} if
* the blocking mode was not adjusted because the channel is closed
* Attempts to adjust the blocking mode if the channel is open.
* @return {@code true} if the blocking mode was adjusted
*/
private boolean tryLockedConfigureBlocking(boolean block) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
synchronized (stateLock) {
if (isOpen()) {
// do nothing if virtual thread has forced the socket to be non-blocking
if (!forcedNonBlocking && isOpen()) {
IOUtil.configureBlocking(fd, block);
return true;
} else {
@ -652,6 +664,20 @@ class SocketChannelImpl
}
}
/**
* Ensures that the socket is configured non-blocking when on a virtual thread.
*/
private void configureSocketNonBlockingIfVirtualThread() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
if (!forcedNonBlocking && Thread.currentThread().isVirtual()) {
synchronized (stateLock) {
ensureOpen();
IOUtil.configureBlocking(fd, false);
forcedNonBlocking = true;
}
}
}
/**
* Returns the local address, or null if not bound
*/
@ -846,6 +872,7 @@ class SocketChannelImpl
boolean connected = false;
try {
beginConnect(blocking, sa);
configureSocketNonBlockingIfVirtualThread();
int n;
if (isUnixSocket()) {
n = UnixDomainSockets.connect(fd, sa);
@ -1009,15 +1036,32 @@ class SocketChannelImpl
private void implCloseBlockingMode() throws IOException {
synchronized (stateLock) {
assert state < ST_CLOSING;
boolean connected = (state == ST_CONNECTED);
state = ST_CLOSING;
if (!tryClose()) {
// shutdown output when linger interval not set to 0
if (connected) {
try {
var SO_LINGER = StandardSocketOptions.SO_LINGER;
if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
Net.shutdown(fd, Net.SHUT_WR);
}
} catch (IOException ignore) { }
}
long reader = readerThread;
long writer = writerThread;
if (reader != 0 || writer != 0) {
if (NativeThread.isVirtualThread(reader)
|| NativeThread.isVirtualThread(writer)) {
Poller.stopPoll(fdVal);
}
if (NativeThread.isNativeThread(reader)
|| NativeThread.isNativeThread(writer)) {
nd.preClose(fd);
if (reader != 0)
if (NativeThread.isNativeThread(reader))
NativeThread.signal(reader);
if (writer != 0)
if (NativeThread.isNativeThread(writer))
NativeThread.signal(writer);
}
}
@ -1098,9 +1142,12 @@ class SocketChannelImpl
throw new NotYetConnectedException();
if (!isInputClosed) {
Net.shutdown(fd, Net.SHUT_RD);
long thread = readerThread;
if (thread != 0)
NativeThread.signal(thread);
long reader = readerThread;
if (NativeThread.isVirtualThread(reader)) {
Poller.stopPoll(fdVal, Net.POLLIN);
} else if (NativeThread.isNativeThread(reader)) {
NativeThread.signal(reader);
}
isInputClosed = true;
}
return this;
@ -1115,9 +1162,12 @@ class SocketChannelImpl
throw new NotYetConnectedException();
if (!isOutputClosed) {
Net.shutdown(fd, Net.SHUT_WR);
long thread = writerThread;
if (thread != 0)
NativeThread.signal(thread);
long writer = writerThread;
if (NativeThread.isVirtualThread(writer)) {
Poller.stopPoll(fdVal, Net.POLLOUT);
} else if (NativeThread.isNativeThread(writer)) {
NativeThread.signal(writer);
}
isOutputClosed = true;
}
return this;
@ -1282,6 +1332,7 @@ class SocketChannelImpl
}
} else {
// read, no timeout
configureSocketNonBlockingIfVirtualThread();
n = tryRead(b, off, len);
while (IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLIN);
@ -1343,6 +1394,7 @@ class SocketChannelImpl
int end = off + len;
try {
beginWrite(true);
configureSocketNonBlockingIfVirtualThread();
while (pos < end && isOpen()) {
int size = end - pos;
int n = tryWrite(b, pos, size);

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2000, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -37,12 +37,15 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.access.foreign.MemorySegmentProxy;
import jdk.internal.misc.TerminatingThreadLocal;
import jdk.internal.misc.Unsafe;
import sun.security.action.GetPropertyAction;
public class Util {
private static JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
// -- Caches --
@ -75,8 +78,7 @@ public class Util {
* for potential future-proofing.
*/
private static long getMaxCachedBufferSize() {
String s = GetPropertyAction
.privilegedGetProperty("jdk.nio.maxCachedBufferSize");
String s = GetPropertyAction.privilegedGetProperty("jdk.nio.maxCachedBufferSize");
if (s != null) {
try {
long m = Long.parseLong(s);
@ -228,7 +230,7 @@ public class Util {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
@ -255,7 +257,7 @@ public class Util {
.alignedSlice(alignment);
}
BufferCache cache = bufferCache.get();
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
ByteBuffer buf = cache.get(size);
if (buf != null) {
if (buf.alignmentOffset(0, alignment) == 0) {
@ -292,7 +294,7 @@ public class Util {
}
assert buf != null;
BufferCache cache = bufferCache.get();
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
if (!cache.offerFirst(buf)) {
// cache is full
free(buf);
@ -314,7 +316,7 @@ public class Util {
}
assert buf != null;
BufferCache cache = bufferCache.get();
BufferCache cache = JLA.getCarrierThreadLocal(bufferCache);
if (!cache.offerLast(buf)) {
// cache is full
free(buf);

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -42,6 +42,7 @@ import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import jdk.internal.misc.InternalLock;
public class StreamDecoder extends Reader {
@ -122,89 +123,145 @@ public class StreamDecoder extends Reader {
return read0();
}
@SuppressWarnings("fallthrough")
private int read0() throws IOException {
synchronized (lock) {
// Return the leftover char, if there is one
if (haveLeftoverChar) {
haveLeftoverChar = false;
return leftoverChar;
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
return lockedRead0();
} finally {
locker.unlock();
}
// Convert more bytes
char[] cb = new char[2];
int n = read(cb, 0, 2);
switch (n) {
case -1:
return -1;
case 2:
leftoverChar = cb[1];
haveLeftoverChar = true;
// FALL THROUGH
case 1:
return cb[0];
default:
assert false : n;
return -1;
} else {
synchronized (lock) {
return lockedRead0();
}
}
}
@SuppressWarnings("fallthrough")
private int lockedRead0() throws IOException {
// Return the leftover char, if there is one
if (haveLeftoverChar) {
haveLeftoverChar = false;
return leftoverChar;
}
// Convert more bytes
char[] cb = new char[2];
int n = read(cb, 0, 2);
switch (n) {
case -1:
return -1;
case 2:
leftoverChar = cb[1];
haveLeftoverChar = true;
// FALL THROUGH
case 1:
return cb[0];
default:
assert false : n;
return -1;
}
}
public int read(char[] cbuf, int offset, int length) throws IOException {
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
return lockedRead(cbuf, offset, length);
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
return lockedRead(cbuf, offset, length);
}
}
}
private int lockedRead(char[] cbuf, int offset, int length) throws IOException {
int off = offset;
int len = length;
synchronized (lock) {
ensureOpen();
if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0)
return 0;
int n = 0;
if (haveLeftoverChar) {
// Copy the leftover char into the buffer
cbuf[off] = leftoverChar;
off++; len--;
haveLeftoverChar = false;
n = 1;
if ((len == 0) || !implReady())
// Return now if this is all we can produce w/o blocking
return n;
}
if (len == 1) {
// Treat single-character array reads just like read()
int c = read0();
if (c == -1)
return (n == 0) ? -1 : n;
cbuf[off] = (char)c;
return n + 1;
}
return n + implRead(cbuf, off, off + len);
ensureOpen();
if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (len == 0)
return 0;
int n = 0;
if (haveLeftoverChar) {
// Copy the leftover char into the buffer
cbuf[off] = leftoverChar;
off++; len--;
haveLeftoverChar = false;
n = 1;
if ((len == 0) || !implReady())
// Return now if this is all we can produce w/o blocking
return n;
}
if (len == 1) {
// Treat single-character array reads just like read()
int c = read0();
if (c == -1)
return (n == 0) ? -1 : n;
cbuf[off] = (char)c;
return n + 1;
}
return n + implRead(cbuf, off, off + len);
}
public boolean ready() throws IOException {
synchronized (lock) {
ensureOpen();
return haveLeftoverChar || implReady();
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
return lockedReady();
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
return lockedReady();
}
}
}
private boolean lockedReady() throws IOException {
ensureOpen();
return haveLeftoverChar || implReady();
}
public void close() throws IOException {
synchronized (lock) {
if (closed)
return;
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
implClose();
lockedClose();
} finally {
closed = true;
locker.unlock();
}
} else {
synchronized (lock) {
lockedClose();
}
}
}
private void lockedClose() throws IOException {
if (closed)
return;
try {
implClose();
} finally {
closed = true;
}
}
@ -236,7 +293,7 @@ public class StreamDecoder extends Reader {
this.decoder = dec;
this.in = in;
this.ch = null;
bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE);
this.bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE);
bb.flip(); // So that bb is initially empty
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -38,11 +38,12 @@ import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
import jdk.internal.misc.InternalLock;
public class StreamEncoder extends Writer
{
public final class StreamEncoder extends Writer {
private static final int DEFAULT_BYTE_BUFFER_SIZE = 8192;
private static final int INITIAL_BYTE_BUFFER_CAPACITY = 512;
private static final int MAX_BYTE_BUFFER_CAPACITY = 8192;
private volatile boolean closed;
@ -106,14 +107,28 @@ public class StreamEncoder extends Writer
}
public void flushBuffer() throws IOException {
synchronized (lock) {
if (isOpen())
implFlushBuffer();
else
throw new IOException("Stream closed");
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
lockedFlushBuffer();
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
lockedFlushBuffer();
}
}
}
private void lockedFlushBuffer() throws IOException {
if (isOpen())
implFlushBuffer();
else
throw new IOException("Stream closed");
}
public void write(int c) throws IOException {
char[] cbuf = new char[1];
cbuf[0] = (char) c;
@ -121,18 +136,32 @@ public class StreamEncoder extends Writer
}
public void write(char[] cbuf, int off, int len) throws IOException {
synchronized (lock) {
ensureOpen();
if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
lockedWrite(cbuf, off, len);
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
lockedWrite(cbuf, off, len);
}
implWrite(cbuf, off, len);
}
}
private void lockedWrite(char[] cbuf, int off, int len) throws IOException {
ensureOpen();
if ((off < 0) || (off > cbuf.length) || (len < 0) ||
((off + len) > cbuf.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
implWrite(cbuf, off, len);
}
public void write(String str, int off, int len) throws IOException {
/* Check the len before creating a char buffer */
if (len < 0)
@ -145,31 +174,73 @@ public class StreamEncoder extends Writer
public void write(CharBuffer cb) throws IOException {
int position = cb.position();
try {
synchronized (lock) {
ensureOpen();
implWrite(cb);
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
lockedWrite(cb);
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
lockedWrite(cb);
}
}
} finally {
cb.position(position);
}
}
private void lockedWrite(CharBuffer cb) throws IOException {
ensureOpen();
implWrite(cb);
}
public void flush() throws IOException {
synchronized (lock) {
ensureOpen();
implFlush();
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
lockedFlush();
} finally {
locker.unlock();
}
} else {
synchronized (lock) {
lockedFlush();
}
}
}
private void lockedFlush() throws IOException {
ensureOpen();
implFlush();
}
public void close() throws IOException {
synchronized (lock) {
if (closed)
return;
Object lock = this.lock;
if (lock instanceof InternalLock locker) {
locker.lock();
try {
implClose();
lockedClose();
} finally {
closed = true;
locker.unlock();
}
} else {
synchronized (lock) {
lockedClose();
}
}
}
private void lockedClose() throws IOException {
if (closed)
return;
try {
implClose();
} finally {
closed = true;
}
}
@ -182,7 +253,8 @@ public class StreamEncoder extends Writer
private final Charset cs;
private final CharsetEncoder encoder;
private final ByteBuffer bb;
private ByteBuffer bb;
private final int maxBufferCapacity;
// Exactly one of these is non-null
private final OutputStream out;
@ -206,7 +278,9 @@ public class StreamEncoder extends Writer
this.ch = null;
this.cs = enc.charset();
this.encoder = enc;
this.bb = ByteBuffer.allocate(DEFAULT_BYTE_BUFFER_SIZE);
this.bb = ByteBuffer.allocate(INITIAL_BYTE_BUFFER_CAPACITY);
this.maxBufferCapacity = MAX_BYTE_BUFFER_CAPACITY;
}
private StreamEncoder(WritableByteChannel ch, CharsetEncoder enc, int mbc) {
@ -214,9 +288,14 @@ public class StreamEncoder extends Writer
this.ch = ch;
this.cs = enc.charset();
this.encoder = enc;
this.bb = ByteBuffer.allocate(mbc < 0
? DEFAULT_BYTE_BUFFER_SIZE
: mbc);
if (mbc > 0) {
this.bb = ByteBuffer.allocate(mbc);
this.maxBufferCapacity = mbc;
} else {
this.bb = ByteBuffer.allocate(INITIAL_BYTE_BUFFER_CAPACITY);
this.maxBufferCapacity = MAX_BYTE_BUFFER_CAPACITY;
}
}
private void writeBytes() throws IOException {
@ -289,6 +368,8 @@ public class StreamEncoder extends Writer
flushLeftoverChar(cb, false);
}
growByteBufferIfNeeded(cb.remaining());
while (cb.hasRemaining()) {
CoderResult cr = encoder.encode(cb, bb, false);
if (cr.isUnderflow()) {
@ -308,6 +389,21 @@ public class StreamEncoder extends Writer
}
}
/**
* Grows bb to a capacity to allow len characters be encoded.
*/
void growByteBufferIfNeeded(int len) throws IOException {
int cap = bb.capacity();
if (cap < maxBufferCapacity) {
int maxBytes = len * Math.round(encoder.maxBytesPerChar());
int newCap = Math.min(maxBytes, maxBufferCapacity);
if (newCap > cap) {
implFlushBuffer();
bb = ByteBuffer.allocate(newCap);
}
}
}
void implFlushBuffer() throws IOException {
if (bb.position() > 0) {
writeBytes();

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2008, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,8 @@
package sun.nio.fs;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.misc.TerminatingThreadLocal;
import jdk.internal.misc.Unsafe;
@ -33,7 +35,7 @@ import jdk.internal.misc.Unsafe;
*/
class NativeBuffers {
private NativeBuffers() { }
private static JavaLangAccess JLA = SharedSecrets.getJavaLangAccess();
private static final Unsafe unsafe = Unsafe.getUnsafe();
@ -54,6 +56,8 @@ class NativeBuffers {
}
};
private NativeBuffers() { }
/**
* Allocates a native buffer, of at least the given size, from the heap.
*/
@ -69,7 +73,7 @@ class NativeBuffers {
*/
static NativeBuffer getNativeBufferFromCache(int size) {
// return from cache if possible
NativeBuffer[] buffers = threadLocal.get();
NativeBuffer[] buffers = JLA.getCarrierThreadLocal(threadLocal);
if (buffers != null) {
for (int i=0; i<TEMP_BUF_POOL_SIZE; i++) {
NativeBuffer buffer = buffers[i];
@ -103,11 +107,11 @@ class NativeBuffers {
*/
static void releaseNativeBuffer(NativeBuffer buffer) {
// create cache if it doesn't exist
NativeBuffer[] buffers = threadLocal.get();
NativeBuffer[] buffers = JLA.getCarrierThreadLocal(threadLocal);
if (buffers == null) {
buffers = new NativeBuffer[TEMP_BUF_POOL_SIZE];
buffers[0] = buffer;
threadLocal.set(buffers);
JLA.setCarrierThreadLocal(threadLocal, buffers);
return;
}
// Put it in an empty slot if such exists

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2022, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -354,7 +354,7 @@ public final class SSLLogger {
Object[] messageFields = {
logger.loggerName,
level.getName(),
Utilities.toHexString(Thread.currentThread().getId()),
Utilities.toHexString(Thread.currentThread().threadId()),
Thread.currentThread().getName(),
dateTimeFormat.format(Instant.now()),
formatCaller(),
@ -371,7 +371,7 @@ public final class SSLLogger {
Object[] messageFields = {
logger.loggerName,
level.getName(),
Utilities.toHexString(Thread.currentThread().getId()),
Utilities.toHexString(Thread.currentThread().threadId()),
Thread.currentThread().getName(),
dateTimeFormat.format(Instant.now()),
formatCaller(),