8318422: Allow poller threads be virtual threads

Reviewed-by: michaelm
This commit is contained in:
Alan Bateman 2023-11-04 06:52:19 +00:00
parent 29cf2c471b
commit c099cf53f2
13 changed files with 368 additions and 399 deletions

View file

@ -33,12 +33,12 @@ class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { } DefaultPollerProvider() { }
@Override @Override
Poller readPoller() throws IOException { Poller readPoller(boolean subPoller) throws IOException {
return new PollsetPoller(true); return new PollsetPoller(true);
} }
@Override @Override
Poller writePoller() throws IOException { Poller writePoller(boolean subPoller) throws IOException {
return new PollsetPoller(false); return new PollsetPoller(false);
} }
} }

View file

@ -42,11 +42,12 @@ class PollsetPoller extends Poller {
MAX_EVENTS_TO_POLL = 512; MAX_EVENTS_TO_POLL = 512;
} }
private final int event;
private final int setid; private final int setid;
private final long pollBuffer; private final long pollBuffer;
PollsetPoller(boolean read) throws IOException { PollsetPoller(boolean read) throws IOException {
super(read); this.event = (read) ? Net.POLLIN : Net.POLLOUT;
this.setid = Pollset.pollsetCreate(); this.setid = Pollset.pollsetCreate();
this.pollBuffer = Pollset.allocatePollArray(MAX_EVENTS_TO_POLL); this.pollBuffer = Pollset.allocatePollArray(MAX_EVENTS_TO_POLL);
} }
@ -58,8 +59,7 @@ class PollsetPoller extends Poller {
@Override @Override
void implRegister(int fd) throws IOException { void implRegister(int fd) throws IOException {
int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, int ret = Pollset.pollsetCtl(setid, Pollset.PS_MOD, fd, Pollset.PS_POLLPRI | event);
Pollset.PS_POLLPRI | (this.reading() ? Net.POLLIN : Net.POLLOUT));
if (ret != 0) { if (ret != 0) {
throw new IOException("Unable to register fd " + fd); throw new IOException("Unable to register fd " + fd);
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,7 @@
package sun.nio.ch; package sun.nio.ch;
import java.io.IOException; import java.io.IOException;
import jdk.internal.vm.ContinuationSupport;
/** /**
* Default PollerProvider for Linux. * Default PollerProvider for Linux.
@ -33,12 +34,31 @@ class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { } DefaultPollerProvider() { }
@Override @Override
Poller readPoller() throws IOException { Poller.Mode defaultPollerMode() {
return new EPollPoller(true); if (ContinuationSupport.isSupported()) {
return Poller.Mode.VTHREAD_POLLERS;
} else {
return Poller.Mode.SYSTEM_THREADS;
}
} }
@Override @Override
Poller writePoller() throws IOException { int defaultReadPollers(Poller.Mode mode) {
return new EPollPoller(false); int ncpus = Runtime.getRuntime().availableProcessors();
if (mode == Poller.Mode.VTHREAD_POLLERS) {
return Math.min(Integer.highestOneBit(ncpus), 32);
} else {
return Math.max(Integer.highestOneBit(ncpus / 4), 1);
}
}
@Override
Poller readPoller(boolean subPoller) throws IOException {
return new EPollPoller(subPoller, true);
}
@Override
Poller writePoller(boolean subPoller) throws IOException {
return new EPollPoller(subPoller, false);
} }
} }

View file

@ -32,18 +32,18 @@ import static sun.nio.ch.EPoll.*;
*/ */
class EPollPoller extends Poller { class EPollPoller extends Poller {
private static final int MAX_EVENTS_TO_POLL = 512;
private static final int ENOENT = 2; private static final int ENOENT = 2;
private final int epfd; private final int epfd;
private final int event; private final int event;
private final int maxEvents;
private final long address; private final long address;
EPollPoller(boolean read) throws IOException { EPollPoller(boolean subPoller, boolean read) throws IOException {
super(read);
this.epfd = EPoll.create(); this.epfd = EPoll.create();
this.event = (read) ? EPOLLIN : EPOLLOUT; this.event = (read) ? EPOLLIN : EPOLLOUT;
this.address = EPoll.allocatePollArray(MAX_EVENTS_TO_POLL); this.maxEvents = (subPoller) ? 64 : 512;
this.address = EPoll.allocatePollArray(maxEvents);
} }
@Override @Override
@ -68,7 +68,7 @@ class EPollPoller extends Poller {
@Override @Override
int poll(int timeout) throws IOException { int poll(int timeout) throws IOException {
int n = EPoll.wait(epfd, address, MAX_EVENTS_TO_POLL, timeout); int n = EPoll.wait(epfd, address, maxEvents, timeout);
int i = 0; int i = 0;
while (i < n) { while (i < n) {
long eventAddress = EPoll.getEvent(address, i); long eventAddress = EPoll.getEvent(address, i);

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -33,12 +33,12 @@ class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { } DefaultPollerProvider() { }
@Override @Override
Poller readPoller() throws IOException { Poller readPoller(boolean subPoller) throws IOException {
return new KQueuePoller(true); return new KQueuePoller(subPoller, true);
} }
@Override @Override
Poller writePoller() throws IOException { Poller writePoller(boolean subPoller) throws IOException {
return new KQueuePoller(false); return new KQueuePoller(subPoller, false);
} }
} }

View file

@ -31,17 +31,16 @@ import static sun.nio.ch.KQueue.*;
* Poller implementation based on the kqueue facility. * Poller implementation based on the kqueue facility.
*/ */
class KQueuePoller extends Poller { class KQueuePoller extends Poller {
private static final int MAX_EVENTS_TO_POLL = 512;
private final int kqfd; private final int kqfd;
private final int filter; private final int filter;
private final int maxEvents;
private final long address; private final long address;
KQueuePoller(boolean read) throws IOException { KQueuePoller(boolean subPoller, boolean read) throws IOException {
super(read);
this.kqfd = KQueue.create(); this.kqfd = KQueue.create();
this.filter = (read) ? EVFILT_READ : EVFILT_WRITE; this.filter = (read) ? EVFILT_READ : EVFILT_WRITE;
this.address = KQueue.allocatePollArray(MAX_EVENTS_TO_POLL); this.maxEvents = (subPoller) ? 64 : 512;
this.address = KQueue.allocatePollArray(maxEvents);
} }
@Override @Override
@ -63,7 +62,7 @@ class KQueuePoller extends Poller {
@Override @Override
int poll(int timeout) throws IOException { int poll(int timeout) throws IOException {
int n = KQueue.poll(kqfd, address, MAX_EVENTS_TO_POLL, timeout); int n = KQueue.poll(kqfd, address, maxEvents, timeout);
int i = 0; int i = 0;
while (i < n) { while (i < n) {
long keventAddress = KQueue.getEvent(address, i); long keventAddress = KQueue.getEvent(address, i);

View file

@ -24,52 +24,100 @@
*/ */
package sun.nio.ch; package sun.nio.ch;
import java.io.IOError;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import java.util.function.BooleanSupplier; import java.util.function.BooleanSupplier;
import java.util.stream.Stream;
import jdk.internal.misc.InnocuousThread; import jdk.internal.misc.InnocuousThread;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.SharedSecrets;
import sun.security.action.GetPropertyAction; import sun.security.action.GetPropertyAction;
/** /**
* Polls file descriptors. Virtual threads invoke the poll method to park * Polls file descriptors. Virtual threads invoke the poll method to park
* until a given file descriptor is ready for I/O. * until a given file descriptor is ready for I/O.
*/ */
public abstract class Poller { abstract class Poller {
private static final JavaLangAccess JLA = SharedSecrets.getJavaLangAccess(); private static final Pollers POLLERS;
private static final Poller[] READ_POLLERS; static {
private static final Poller[] WRITE_POLLERS; try {
private static final int READ_MASK, WRITE_MASK; var pollers = new Pollers();
private static final boolean USE_DIRECT_REGISTER; pollers.start();
POLLERS = pollers;
// true if this is a poller for reading, false for writing } catch (IOException ioe) {
private final boolean read; throw new ExceptionInInitializerError(ioe);
}
}
// maps file descriptors to parked Thread // maps file descriptors to parked Thread
private final Map<Integer, Thread> map = new ConcurrentHashMap<>(); private final Map<Integer, Thread> map = new ConcurrentHashMap<>();
// the queue of updates to the updater Thread /**
private final BlockingQueue<Request> queue = new LinkedTransferQueue<>(); * Poller mode.
*/
enum Mode {
/**
* ReadPoller and WritePoller are dedicated platform threads that block waiting
* for events and unpark virtual threads when file descriptors are ready for I/O.
*/
SYSTEM_THREADS,
/** /**
* Initialize a Poller for reading or writing. * ReadPoller and WritePoller threads are virtual threads that poll for events,
* yielding between polls and unparking virtual threads when file descriptors are
* ready for I/O. If there are no events then the poller threads park until there
* are I/O events to poll. This mode helps to integrate polling with virtual
* thread scheduling. The approach is similar to the default scheme in "User-level
* Threading: Have Your Cake and Eat It Too" by Karsten and Barghi 2020
* (https://dl.acm.org/doi/10.1145/3379483).
*/ */
protected Poller(boolean read) { VTHREAD_POLLERS
this.read = read;
} }
/** /**
* Returns true if this poller is for read (POLLIN) events. * Initialize a Poller.
*/ */
final boolean reading() { protected Poller() {
return read; }
/**
* Returns the poller's file descriptor, used when the read and write poller threads
* are virtual threads.
*
* @throws UnsupportedOperationException if not supported
*/
int fdVal() {
throw new UnsupportedOperationException();
}
/**
* Register the file descriptor.
*/
abstract void implRegister(int fdVal) throws IOException;
/**
* Deregister the file descriptor.
*/
abstract void implDeregister(int fdVal);
/**
* Poll for events. The {@link #polled(int)} method is invoked for each
* polled file descriptor.
*
* @param timeout if positive then block for up to {@code timeout} milliseconds,
* if zero then don't block, if -1 then block indefinitely
* @return the number of file descriptors polled
*/
abstract int poll(int timeout) throws IOException;
/**
* Callback by the poll method when a file descriptor is polled.
*/
final void polled(int fdVal) {
wakeup(fdVal);
} }
/** /**
@ -79,35 +127,45 @@ public abstract class Poller {
* @param nanos the waiting time or 0 to wait indefinitely * @param nanos the waiting time or 0 to wait indefinitely
* @param supplier supplies a boolean to indicate if the enclosing object is open * @param supplier supplies a boolean to indicate if the enclosing object is open
*/ */
public static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier) static void poll(int fdVal, int event, long nanos, BooleanSupplier supplier)
throws IOException throws IOException
{ {
assert nanos >= 0L; assert nanos >= 0L;
if (event == Net.POLLIN) { if (event == Net.POLLIN) {
readPoller(fdVal).poll(fdVal, nanos, supplier); POLLERS.readPoller(fdVal).poll(fdVal, nanos, supplier);
} else if (event == Net.POLLOUT) { } else if (event == Net.POLLOUT) {
writePoller(fdVal).poll(fdVal, nanos, supplier); POLLERS.writePoller(fdVal).poll(fdVal, nanos, supplier);
} else { } else {
assert false; assert false;
} }
} }
/** /**
* Parks the current thread until a file descriptor is ready. * If there is a thread polling the given file descriptor for the given event then
* the thread is unparked.
*/ */
private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { static void stopPoll(int fdVal, int event) {
if (USE_DIRECT_REGISTER) { if (event == Net.POLLIN) {
pollDirect(fdVal, nanos, supplier); POLLERS.readPoller(fdVal).wakeup(fdVal);
} else if (event == Net.POLLOUT) {
POLLERS.writePoller(fdVal).wakeup(fdVal);
} else { } else {
pollIndirect(fdVal, nanos, supplier); throw new IllegalArgumentException();
} }
} }
/** /**
* Parks the current thread until a file descriptor is ready. This implementation * If there are any threads polling the given file descriptor then they are unparked.
* registers the file descriptor, then parks until the file descriptor is polled.
*/ */
private void pollDirect(int fdVal, long nanos, BooleanSupplier supplier) throws IOException { static void stopPoll(int fdVal) {
stopPoll(fdVal, Net.POLLIN);
stopPoll(fdVal, Net.POLLOUT);
}
/**
* Parks the current thread until a file descriptor is ready.
*/
private void poll(int fdVal, long nanos, BooleanSupplier supplier) throws IOException {
register(fdVal); register(fdVal);
try { try {
boolean isOpen = supplier.getAsBoolean(); boolean isOpen = supplier.getAsBoolean();
@ -123,28 +181,6 @@ public abstract class Poller {
} }
} }
/**
* Parks the current thread until a file descriptor is ready. This implementation
* queues the file descriptor to the update thread, then parks until the file
* descriptor is polled.
*/
private void pollIndirect(int fdVal, long nanos, BooleanSupplier supplier) {
Request request = registerAsync(fdVal);
try {
boolean isOpen = supplier.getAsBoolean();
if (isOpen) {
if (nanos > 0) {
LockSupport.parkNanos(nanos);
} else {
LockSupport.park();
}
}
} finally {
request.awaitFinish();
deregister(fdVal);
}
}
/** /**
* Registers the file descriptor. * Registers the file descriptor.
*/ */
@ -154,18 +190,6 @@ public abstract class Poller {
implRegister(fdVal); implRegister(fdVal);
} }
/**
* Queues the file descriptor to be registered by the updater thread, returning
* a Request object to track the request.
*/
private Request registerAsync(int fdVal) {
Thread previous = map.putIfAbsent(fdVal, Thread.currentThread());
assert previous == null;
Request request = new Request(fdVal);
queue.add(request);
return request;
}
/** /**
* Deregister the file descriptor, a no-op if already polled. * Deregister the file descriptor, a no-op if already polled.
*/ */
@ -177,159 +201,6 @@ public abstract class Poller {
} }
} }
/**
* A registration request queued to the updater thread.
*/
private static class Request {
private final int fdVal;
private volatile boolean done;
private volatile Thread waiter;
Request(int fdVal) {
this.fdVal = fdVal;
}
private int fdVal() {
return fdVal;
}
/**
* Invoked by the updater when the request has been processed.
*/
void finish() {
done = true;
Thread waiter = this.waiter;
if (waiter != null) {
LockSupport.unpark(waiter);
}
}
/**
* Waits for a request to be processed.
*/
void awaitFinish() {
if (!done) {
waiter = Thread.currentThread();
boolean interrupted = false;
while (!done) {
LockSupport.park();
if (Thread.interrupted()) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Register the file descriptor.
*/
abstract void implRegister(int fdVal) throws IOException;
/**
* Deregister the file descriptor.
*/
abstract void implDeregister(int fdVal);
/**
* Starts the poller threads.
*/
private Poller start() {
String prefix = (read) ? "Read" : "Write";
startThread(prefix + "-Poller", this::pollLoop);
if (!USE_DIRECT_REGISTER) {
startThread(prefix + "-Updater", this::updateLoop);
}
return this;
}
/**
* Starts a platform thread to run the given task.
*/
private void startThread(String name, Runnable task) {
try {
Thread thread = JLA.executeOnCarrierThread(() ->
InnocuousThread.newSystemThread(name, task)
);
thread.setDaemon(true);
thread.start();
} catch (Exception e) {
throw new InternalError(e);
}
}
/**
* Polling loop.
*/
private void pollLoop() {
try {
for (;;) {
poll();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* The update loop to handle updates to the interest set.
*/
private void updateLoop() {
try {
for (;;) {
Request req = null;
while (req == null) {
try {
req = queue.take();
} catch (InterruptedException ignore) { }
}
implRegister(req.fdVal());
req.finish();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Maps the file descriptor value to a read poller.
*/
private static Poller readPoller(int fdVal) {
return READ_POLLERS[fdVal & READ_MASK];
}
/**
* Maps the file descriptor value to a write poller.
*/
private static Poller writePoller(int fdVal) {
return WRITE_POLLERS[fdVal & WRITE_MASK];
}
/**
* Unparks any thread that is polling the given file descriptor for the
* given event.
*/
static void stopPoll(int fdVal, int event) {
if (event == Net.POLLIN) {
readPoller(fdVal).wakeup(fdVal);
} else if (event == Net.POLLOUT) {
writePoller(fdVal).wakeup(fdVal);
} else {
throw new IllegalArgumentException();
}
}
/**
* Unparks any threads that are polling the given file descriptor.
*/
static void stopPoll(int fdVal) {
stopPoll(fdVal, Net.POLLIN);
stopPoll(fdVal, Net.POLLOUT);
}
/** /**
* Unparks any thread that is polling the given file descriptor. * Unparks any thread that is polling the given file descriptor.
*/ */
@ -341,89 +212,145 @@ public abstract class Poller {
} }
/** /**
* Called by the polling facility when the file descriptor is polled * Master polling loop. The {@link #polled(int)} method is invoked for each file
* descriptor that is polled.
*/ */
final void polled(int fdVal) { private void pollerLoop() {
wakeup(fdVal);
}
/**
* Poll for events. The {@link #polled(int)} method is invoked for each
* polled file descriptor.
*
* @param timeout if positive then block for up to {@code timeout} milliseconds,
* if zero then don't block, if -1 then block indefinitely
*/
abstract int poll(int timeout) throws IOException;
/**
* Poll for events, blocks indefinitely.
*/
final int poll() throws IOException {
return poll(-1);
}
/**
* Poll for events, non-blocking.
*/
final int pollNow() throws IOException {
return poll(0);
}
/**
* Returns the poller's file descriptor, or -1 if none.
*/
int fdVal() {
return -1;
}
/**
* Creates the read and writer pollers.
*/
static {
PollerProvider provider = PollerProvider.provider();
String s = GetPropertyAction.privilegedGetProperty("jdk.useDirectRegister");
if (s == null) {
USE_DIRECT_REGISTER = provider.useDirectRegister();
} else {
USE_DIRECT_REGISTER = "".equals(s) || Boolean.parseBoolean(s);
}
try { try {
Poller[] readPollers = createReadPollers(provider); for (;;) {
READ_POLLERS = readPollers; poll(-1);
READ_MASK = readPollers.length - 1; }
Poller[] writePollers = createWritePollers(provider); } catch (Exception e) {
WRITE_POLLERS = writePollers; e.printStackTrace();
WRITE_MASK = writePollers.length - 1;
} catch (IOException ioe) {
throw new IOError(ioe);
} }
} }
/** /**
* Create the read poller(s). * Sub-poller polling loop. The {@link #polled(int)} method is invoked for each file
* descriptor that is polled.
*
* The sub-poller registers its file descriptor with the master poller to park until
* there are events to poll. When unparked, it does non-blocking polls and parks
* again when there are no more events. The sub-poller yields after each poll to help
* with fairness and to avoid re-registering with the master poller where possible.
*/ */
private static Poller[] createReadPollers(PollerProvider provider) throws IOException { private void subPollerLoop(Poller masterPoller) {
int readPollerCount = pollerCount("jdk.readPollers"); assert Thread.currentThread().isVirtual();
try {
int polled = 0;
for (;;) {
if (polled == 0) {
masterPoller.poll(fdVal(), 0, () -> true); // park
} else {
Thread.yield();
}
polled = poll(0);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* The Pollers used for read and write events.
*/
private static class Pollers {
private final PollerProvider provider;
private final Poller.Mode pollerMode;
private final Poller masterPoller;
private final Poller[] readPollers;
private final Poller[] writePollers;
// used by start method to executor is kept alive
private Executor executor;
/**
* Creates the Poller instances based on configuration.
*/
Pollers() throws IOException {
PollerProvider provider = PollerProvider.provider();
Poller.Mode mode;
String s = GetPropertyAction.privilegedGetProperty("jdk.pollerMode");
if (s != null) {
if (s.equalsIgnoreCase(Mode.SYSTEM_THREADS.name()) || s.equals("1")) {
mode = Mode.SYSTEM_THREADS;
} else if (s.equalsIgnoreCase(Mode.VTHREAD_POLLERS.name()) || s.equals("2")) {
mode = Mode.VTHREAD_POLLERS;
} else {
throw new RuntimeException("Can't parse '" + s + "' as polling mode");
}
} else {
mode = provider.defaultPollerMode();
}
// vthread poller mode needs a master poller
Poller masterPoller = (mode == Mode.VTHREAD_POLLERS)
? provider.readPoller(false)
: null;
// read pollers (or sub-pollers)
int readPollerCount = pollerCount("jdk.readPollers", provider.defaultReadPollers(mode));
Poller[] readPollers = new Poller[readPollerCount]; Poller[] readPollers = new Poller[readPollerCount];
for (int i = 0; i< readPollerCount; i++) { for (int i = 0; i < readPollerCount; i++) {
var poller = provider.readPoller(); readPollers[i] = provider.readPoller(mode == Mode.VTHREAD_POLLERS);
readPollers[i] = poller.start();
} }
return readPollers;
// write pollers (or sub-pollers)
int writePollerCount = pollerCount("jdk.writePollers", provider.defaultWritePollers(mode));
Poller[] writePollers = new Poller[writePollerCount];
for (int i = 0; i < writePollerCount; i++) {
writePollers[i] = provider.writePoller(mode == Mode.VTHREAD_POLLERS);
}
this.provider = provider;
this.pollerMode = mode;
this.masterPoller = masterPoller;
this.readPollers = readPollers;
this.writePollers = writePollers;
} }
/** /**
* Create the write poller(s). * Starts the Poller threads.
*/ */
private static Poller[] createWritePollers(PollerProvider provider) throws IOException { void start() {
int writePollerCount = pollerCount("jdk.writePollers"); if (pollerMode == Mode.VTHREAD_POLLERS) {
Poller[] writePollers = new Poller[writePollerCount]; startPlatformThread("MasterPoller", masterPoller::pollerLoop);
for (int i = 0; i< writePollerCount; i++) { ThreadFactory factory = Thread.ofVirtual()
var poller = provider.writePoller(); .inheritInheritableThreadLocals(false)
writePollers[i] = poller.start(); .name("SubPoller-", 0)
.uncaughtExceptionHandler((t, e) -> e.printStackTrace())
.factory();
executor = Executors.newThreadPerTaskExecutor(factory);
Arrays.stream(readPollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
Arrays.stream(writePollers).forEach(p -> {
executor.execute(() -> p.subPollerLoop(masterPoller));
});
} else {
Arrays.stream(readPollers).forEach(p -> {
startPlatformThread("Read-Poller", p::pollerLoop);
});
Arrays.stream(writePollers).forEach(p -> {
startPlatformThread("Write-Poller", p::pollerLoop);
});
} }
return writePollers; }
/**
* Returns the read poller for the given file descriptor.
*/
Poller readPoller(int fdVal) {
int index = provider.fdValToIndex(fdVal, readPollers.length);
return readPollers[index];
}
/**
* Returns the write poller for the given file descriptor.
*/
Poller writePoller(int fdVal) {
int index = provider.fdValToIndex(fdVal, writePollers.length);
return writePollers[index];
} }
/** /**
@ -433,37 +360,30 @@ public abstract class Poller {
* @throws IllegalArgumentException if the property is set to a value that * @throws IllegalArgumentException if the property is set to a value that
* is not a power of 2. * is not a power of 2.
*/ */
private static int pollerCount(String propName) { private static int pollerCount(String propName, int defaultCount) {
String s = GetPropertyAction.privilegedGetProperty(propName, "1"); String s = GetPropertyAction.privilegedGetProperty(propName);
int count = Integer.parseInt(s); int count = (s != null) ? Integer.parseInt(s) : defaultCount;
// check power of 2 // check power of 2
if (count != (1 << log2(count))) { if (count != Integer.highestOneBit(count)) {
String msg = propName + " is set to a vale that is not a power of 2"; String msg = propName + " is set to a vale that is not a power of 2";
throw new IllegalArgumentException(msg); throw new IllegalArgumentException(msg);
} }
return count; return count;
} }
private static int log2(int n) {
return 31 - Integer.numberOfLeadingZeros(n);
}
/** /**
* Return a stream of all threads blocked waiting for I/O operations. * Starts a platform thread to run the given task.
*/ */
public static Stream<Thread> blockedThreads() { private void startPlatformThread(String name, Runnable task) {
Stream<Thread> s = Stream.empty(); try {
for (int i = 0; i < READ_POLLERS.length; i++) { Thread thread = InnocuousThread.newSystemThread(name, task);
s = Stream.concat(s, READ_POLLERS[i].registeredThreads()); thread.setDaemon(true);
thread.setUncaughtExceptionHandler((t, e) -> e.printStackTrace());
thread.start();
} catch (Exception e) {
throw new InternalError(e);
} }
for (int i = 0; i < WRITE_POLLERS.length; i++) {
s = Stream.concat(s, WRITE_POLLERS[i].registeredThreads());
} }
return s;
}
private Stream<Thread> registeredThreads() {
return map.values().stream();
} }
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -25,46 +25,63 @@
package sun.nio.ch; package sun.nio.ch;
import java.io.IOException; import java.io.IOException;
import java.util.ServiceConfigurationError;
import sun.security.action.GetPropertyAction;
/**
* Provider class for Poller implementations.
*/
abstract class PollerProvider { abstract class PollerProvider {
private static final PollerProvider INSTANCE = new DefaultPollerProvider();
PollerProvider() { } PollerProvider() { }
/** /**
* Returns true if threads should register file descriptors directly, * Returns the system-wide PollerProvider.
* false to queue registrations to an updater thread.
*
* The default implementation returns false.
*/ */
boolean useDirectRegister() { static PollerProvider provider() {
return false; return INSTANCE;
}
/**
* Returns the default poller mode.
* @implSpec The default implementation uses system threads.
*/
Poller.Mode defaultPollerMode() {
return Poller.Mode.SYSTEM_THREADS;
}
/**
* Default number of read pollers for the given mode. The count must be a power of 2.
* @implSpec The default implementation returns 1.
*/
int defaultReadPollers(Poller.Mode mode) {
return 1;
}
/**
* Default number of write pollers for the given mode. The count must be a power of 2.
* @implSpec The default implementation returns 1.
*/
int defaultWritePollers(Poller.Mode mode) {
return 1;
}
/**
* Maps a file descriptor to an index from 0 to {@code toIndex}.
* @implSpec The default implementation is good for Unix file descriptors.
*/
int fdValToIndex(int fdVal, int toIndex) {
return fdVal & (toIndex - 1);
} }
/** /**
* Creates a Poller for read ops. * Creates a Poller for read ops.
* @param subPoller true to create a sub-poller
*/ */
abstract Poller readPoller() throws IOException; abstract Poller readPoller(boolean subPoller) throws IOException;
/** /**
* Creates a Poller for write ops. * Creates a Poller for write ops.
* @param subPoller true to create a sub-poller
*/ */
abstract Poller writePoller() throws IOException; abstract Poller writePoller(boolean subPoller) throws IOException;
/**
* Creates the PollerProvider.
*/
static PollerProvider provider() {
String cn = GetPropertyAction.privilegedGetProperty("jdk.PollerProvider");
if (cn != null) {
try {
Class<?> clazz = Class.forName(cn, true, ClassLoader.getSystemClassLoader());
return (PollerProvider) clazz.getConstructor().newInstance();
} catch (Exception e) {
throw new ServiceConfigurationError(null, e);
}
} else {
return new DefaultPollerProvider();
}
}
} }

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -33,12 +33,26 @@ class DefaultPollerProvider extends PollerProvider {
DefaultPollerProvider() { } DefaultPollerProvider() { }
@Override @Override
Poller readPoller() throws IOException { int defaultReadPollers(Poller.Mode mode) {
assert mode == Poller.Mode.SYSTEM_THREADS;
int ncpus = Runtime.getRuntime().availableProcessors();
return Math.max(Integer.highestOneBit(ncpus / 8), 1);
}
@Override
int fdValToIndex(int fdVal, int toIndex) {
return (fdVal >> 2) & (toIndex - 1);
}
@Override
Poller readPoller(boolean subPoller) throws IOException {
assert !subPoller;
return new WEPollPoller(true); return new WEPollPoller(true);
} }
@Override @Override
Poller writePoller() throws IOException { Poller writePoller(boolean subPoller) throws IOException {
assert !subPoller;
return new WEPollPoller(false); return new WEPollPoller(false);
} }
} }

View file

@ -39,7 +39,6 @@ class WEPollPoller extends Poller {
private final long address; private final long address;
WEPollPoller(boolean read) throws IOException { WEPollPoller(boolean read) throws IOException {
super(read);
this.handle = WEPoll.create(); this.handle = WEPoll.create();
this.event = (read) ? EPOLLIN : EPOLLOUT; this.event = (read) ? EPOLLIN : EPOLLOUT;
this.address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL); this.address = WEPoll.allocatePollArray(MAX_EVENTS_TO_POLL);

View file

@ -84,11 +84,11 @@ vmTestbase/nsk/jdb/repeat/repeat001/repeat001.java 8300707 generic-all
vmTestbase/nsk/jdi/ExceptionEvent/catchLocation/location002/TestDescription.java 8278470 generic-all vmTestbase/nsk/jdi/ExceptionEvent/catchLocation/location002/TestDescription.java 8278470 generic-all
### ###
# This test always times out on windows. This is due to the test forcing OOME in the # This test times out on Windows and Linux. This is due to the test forcing OOME in
# debuggee, which has the side affect of making the Read-Poller thread exit. Because # the debuggee, which can lead to I/O poller threads exiting. Because
# of this no vthreads can complete their reads, and the test times out as a result. # of this no vthreads can complete their reads, and the test times out as a result.
vmTestbase/nsk/jdi/VMOutOfMemoryException/VMOutOfMemoryException001/VMOutOfMemoryException001.java 8285417 windows-all vmTestbase/nsk/jdi/VMOutOfMemoryException/VMOutOfMemoryException001/VMOutOfMemoryException001.java 8285417 generic-all
########## ##########
## Tests incompatible with with virtual test thread factory. ## Tests incompatible with with virtual test thread factory.

View file

@ -24,17 +24,17 @@
/** /**
* @test id=default * @test id=default
* @bug 8284161 * @bug 8284161
* @summary Test virtual threads doing blocking I/O on java.net sockets * @summary Test virtual threads doing blocking I/O on java.net Sockets
* @library /test/lib * @library /test/lib
* @run junit BlockingSocketOps * @run junit BlockingSocketOps
*/ */
/** /**
* @test id=direct-register * @test id=poller-modes
* @summary Test virtual threads doing blocking I/O on java.net sockets and with * @requires (os.family == "linux") | (os.family == "mac")
* the I/O poller configured to use direct registration
* @library /test/lib * @library /test/lib
* @run junit/othervm -Djdk.useDirectRegister BlockingSocketOps * @run junit/othervm -Djdk.pollerMode=1 BlockingSocketOps
* @run junit/othervm -Djdk.pollerMode=2 BlockingSocketOps
*/ */
/** /**

View file

@ -30,11 +30,11 @@
*/ */
/** /**
* @test id=direct-register * @test id=poller-modes
* @summary Test virtual threads doing blocking I/O on NIO channels and with * @requires (os.family == "linux") | (os.family == "mac")
* the I/O poller configured to use direct registration
* @library /test/lib * @library /test/lib
* @run junit/othervm -Djdk.useDirectRegister BlockingChannelOps * @run junit/othervm -Djdk.pollerMode=1 BlockingChannelOps
* @run junit/othervm -Djdk.pollerMode=2 BlockingChannelOps
*/ */
/** /**
@ -507,7 +507,7 @@ class BlockingChannelOps {
*/ */
@Test @Test
void testDatagramSocketAdaptorReceive2() throws Exception { void testDatagramSocketAdaptorReceive2() throws Exception {
testDatagramSocketAdaptorReceive(60_1000); testDatagramSocketAdaptorReceive(60_000);
} }
private void testDatagramSocketAdaptorReceive(int timeout) throws Exception { private void testDatagramSocketAdaptorReceive(int timeout) throws Exception {