mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-27 23:04:50 +02:00
8200179: (se) More Selector cleanup
Reviewed-by: bpb
This commit is contained in:
parent
4b6cd06e3d
commit
187bf57418
19 changed files with 869 additions and 1247 deletions
|
@ -26,67 +26,71 @@
|
|||
package sun.nio.ch;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.*;
|
||||
import java.nio.channels.spi.*;
|
||||
import java.util.*;
|
||||
import java.nio.channels.ClosedSelectorException;
|
||||
import java.nio.channels.SelectableChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.BitSet;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static sun.nio.ch.DevPollArrayWrapper.NUM_POLLFDS;
|
||||
import static sun.nio.ch.DevPollArrayWrapper.POLLREMOVE;
|
||||
|
||||
/**
|
||||
* An implementation of Selector for Solaris.
|
||||
* Solaris /dev/poll based Selector implementation
|
||||
*/
|
||||
|
||||
class DevPollSelectorImpl
|
||||
extends SelectorImpl
|
||||
{
|
||||
// File descriptors used for interrupt
|
||||
// provides access to /dev/poll driver
|
||||
private final DevPollArrayWrapper pollWrapper;
|
||||
|
||||
// file descriptors used for interrupt
|
||||
private final int fd0;
|
||||
private final int fd1;
|
||||
|
||||
// The poll object
|
||||
private final DevPollArrayWrapper pollWrapper;
|
||||
// maps file descriptor to selection key, synchronize on selector
|
||||
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
|
||||
|
||||
// Maps from file descriptors to keys
|
||||
private final Map<Integer, SelectionKeyImpl> fdToKey;
|
||||
// file descriptors registered with /dev/poll, synchronize on selector
|
||||
private final BitSet registered = new BitSet();
|
||||
|
||||
// True if this Selector has been closed
|
||||
private boolean closed;
|
||||
// pending new registrations/updates, queued by implRegister and putEventOps
|
||||
private final Object updateLock = new Object();
|
||||
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
|
||||
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
|
||||
private final Deque<Integer> updateOps = new ArrayDeque<>();
|
||||
|
||||
// Lock for close/cleanup
|
||||
private final Object closeLock = new Object();
|
||||
|
||||
// Lock for interrupt triggering and clearing
|
||||
// interrupt triggering and clearing
|
||||
private final Object interruptLock = new Object();
|
||||
private boolean interruptTriggered;
|
||||
|
||||
/**
|
||||
* Package private constructor called by factory method in
|
||||
* the abstract superclass Selector.
|
||||
*/
|
||||
|
||||
DevPollSelectorImpl(SelectorProvider sp) throws IOException {
|
||||
super(sp);
|
||||
long pipeFds = IOUtil.makePipe(false);
|
||||
fd0 = (int) (pipeFds >>> 32);
|
||||
fd1 = (int) pipeFds;
|
||||
this.pollWrapper = new DevPollArrayWrapper();
|
||||
try {
|
||||
pollWrapper = new DevPollArrayWrapper();
|
||||
pollWrapper.initInterrupt(fd0, fd1);
|
||||
fdToKey = new HashMap<>();
|
||||
} catch (Throwable t) {
|
||||
try {
|
||||
FileDispatcherImpl.closeIntFD(fd0);
|
||||
} catch (IOException ioe0) {
|
||||
t.addSuppressed(ioe0);
|
||||
}
|
||||
try {
|
||||
FileDispatcherImpl.closeIntFD(fd1);
|
||||
} catch (IOException ioe1) {
|
||||
t.addSuppressed(ioe1);
|
||||
}
|
||||
throw t;
|
||||
long fds = IOUtil.makePipe(false);
|
||||
this.fd0 = (int) (fds >>> 32);
|
||||
this.fd1 = (int) fds;
|
||||
} catch (IOException ioe) {
|
||||
pollWrapper.close();
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
// register one end of the socket pair for wakeups
|
||||
pollWrapper.register(fd0, Net.POLLIN);
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (closed)
|
||||
if (!isOpen())
|
||||
throw new ClosedSelectorException();
|
||||
}
|
||||
|
||||
|
@ -94,62 +98,123 @@ class DevPollSelectorImpl
|
|||
protected int doSelect(long timeout)
|
||||
throws IOException
|
||||
{
|
||||
ensureOpen();
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
int numEntries;
|
||||
processUpdateQueue();
|
||||
processDeregisterQueue();
|
||||
try {
|
||||
begin();
|
||||
pollWrapper.poll(timeout);
|
||||
numEntries = pollWrapper.poll(timeout);
|
||||
} finally {
|
||||
end();
|
||||
}
|
||||
processDeregisterQueue();
|
||||
int numKeysUpdated = updateSelectedKeys();
|
||||
if (pollWrapper.interrupted()) {
|
||||
// Clear the wakeup pipe
|
||||
pollWrapper.putReventOps(pollWrapper.interruptedIndex(), 0);
|
||||
synchronized (interruptLock) {
|
||||
pollWrapper.clearInterrupted();
|
||||
IOUtil.drain(fd0);
|
||||
interruptTriggered = false;
|
||||
}
|
||||
}
|
||||
return numKeysUpdated;
|
||||
return updateSelectedKeys(numEntries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys whose fd's have been selected by the devpoll
|
||||
* driver. Add the ready keys to the ready queue.
|
||||
* Process new registrations and changes to the interest ops.
|
||||
*/
|
||||
private int updateSelectedKeys() {
|
||||
int entries = pollWrapper.updated;
|
||||
int numKeysUpdated = 0;
|
||||
for (int i=0; i<entries; i++) {
|
||||
int nextFD = pollWrapper.getDescriptor(i);
|
||||
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
|
||||
// ski is null in the case of an interrupt
|
||||
if (ski != null) {
|
||||
int rOps = pollWrapper.getReventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
|
||||
numKeysUpdated++;
|
||||
private void processUpdateQueue() throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
synchronized (updateLock) {
|
||||
SelectionKeyImpl ski;
|
||||
|
||||
// new registrations
|
||||
while ((ski = newKeys.pollFirst()) != null) {
|
||||
if (ski.isValid()) {
|
||||
SelChImpl ch = ski.channel;
|
||||
int fd = ch.getFDVal();
|
||||
SelectionKeyImpl previous = fdToKey.put(fd, ski);
|
||||
assert previous == null;
|
||||
assert registered.get(fd) == false;
|
||||
}
|
||||
}
|
||||
|
||||
// Translate the queued updates to changes to the set of monitored
|
||||
// file descriptors. The changes are written to the /dev/poll driver
|
||||
// in bulk.
|
||||
assert updateKeys.size() == updateOps.size();
|
||||
int index = 0;
|
||||
while ((ski = updateKeys.pollFirst()) != null) {
|
||||
int ops = updateOps.pollFirst();
|
||||
int fd = ski.channel.getFDVal();
|
||||
if (ski.isValid() && fdToKey.containsKey(fd)) {
|
||||
if (registered.get(fd)) {
|
||||
if (ops == 0) {
|
||||
// remove file descriptor
|
||||
pollWrapper.putPollFD(index++, fd, POLLREMOVE);
|
||||
registered.clear(fd);
|
||||
} else {
|
||||
// change events
|
||||
pollWrapper.putPollFD(index++, fd, POLLREMOVE);
|
||||
pollWrapper.putPollFD(index++, fd, (short)ops);
|
||||
}
|
||||
} else if (ops != 0) {
|
||||
// add file descriptor
|
||||
pollWrapper.putPollFD(index++, fd, (short)ops);
|
||||
registered.set(fd);
|
||||
}
|
||||
} else {
|
||||
ski.channel.translateAndSetReadyOps(rOps, ski);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
if (index > (NUM_POLLFDS-2)) {
|
||||
pollWrapper.registerMultiple(index);
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// write any remaining changes
|
||||
if (index > 0)
|
||||
pollWrapper.registerMultiple(index);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the keys whose fd's have been selected by the /dev/poll.
|
||||
* Add the ready keys to the ready queue.
|
||||
*/
|
||||
private int updateSelectedKeys(int numEntries) throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
boolean interrupted = false;
|
||||
int numKeysUpdated = 0;
|
||||
for (int i=0; i<numEntries; i++) {
|
||||
int fd = pollWrapper.getDescriptor(i);
|
||||
if (fd == fd0) {
|
||||
interrupted = true;
|
||||
} else {
|
||||
SelectionKeyImpl ski = fdToKey.get(fd);
|
||||
if (ski != null) {
|
||||
int rOps = pollWrapper.getReventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.channel.translateAndSetReadyOps(rOps, ski);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (interrupted) {
|
||||
clearInterrupt();
|
||||
}
|
||||
|
||||
return numKeysUpdated;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implClose() throws IOException {
|
||||
if (closed)
|
||||
return;
|
||||
closed = true;
|
||||
assert !isOpen();
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioKeys());
|
||||
|
||||
// prevent further wakeup
|
||||
synchronized (interruptLock) {
|
||||
|
@ -174,42 +239,67 @@ class DevPollSelectorImpl
|
|||
|
||||
@Override
|
||||
protected void implRegister(SelectionKeyImpl ski) {
|
||||
int fd = IOUtil.fdVal(ski.channel.getFD());
|
||||
fdToKey.put(Integer.valueOf(fd), ski);
|
||||
assert Thread.holdsLock(nioKeys());
|
||||
ensureOpen();
|
||||
synchronized (updateLock) {
|
||||
newKeys.addLast(ski);
|
||||
}
|
||||
keys.add(ski);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implDereg(SelectionKeyImpl ski) throws IOException {
|
||||
int i = ski.getIndex();
|
||||
assert (i >= 0);
|
||||
assert !ski.isValid();
|
||||
assert Thread.holdsLock(this);
|
||||
assert Thread.holdsLock(nioKeys());
|
||||
assert Thread.holdsLock(nioSelectedKeys());
|
||||
|
||||
int fd = ski.channel.getFDVal();
|
||||
fdToKey.remove(Integer.valueOf(fd));
|
||||
pollWrapper.release(fd);
|
||||
ski.setIndex(-1);
|
||||
keys.remove(ski);
|
||||
fdToKey.remove(fd);
|
||||
if (registered.get(fd)) {
|
||||
pollWrapper.register(fd, POLLREMOVE);
|
||||
registered.clear(fd);
|
||||
}
|
||||
|
||||
selectedKeys.remove(ski);
|
||||
deregister((AbstractSelectionKey)ski);
|
||||
keys.remove(ski);
|
||||
|
||||
// remove from channel's key set
|
||||
deregister(ski);
|
||||
|
||||
SelectableChannel selch = ski.channel();
|
||||
if (!selch.isOpen() && !selch.isRegistered())
|
||||
((SelChImpl)selch).kill();
|
||||
((SelChImpl) selch).kill();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putEventOps(SelectionKeyImpl sk, int ops) {
|
||||
public void putEventOps(SelectionKeyImpl ski, int ops) {
|
||||
ensureOpen();
|
||||
int fd = IOUtil.fdVal(sk.channel.getFD());
|
||||
pollWrapper.setInterest(fd, ops);
|
||||
synchronized (updateLock) {
|
||||
updateOps.addLast(ops); // ops first in case adding the key fails
|
||||
updateKeys.addLast(ski);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Selector wakeup() {
|
||||
synchronized (interruptLock) {
|
||||
if (!interruptTriggered) {
|
||||
pollWrapper.interrupt();
|
||||
try {
|
||||
IOUtil.write1(fd1, (byte)0);
|
||||
} catch (IOException ioe) {
|
||||
throw new InternalError(ioe);
|
||||
}
|
||||
interruptTriggered = true;
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
private void clearInterrupt() throws IOException {
|
||||
synchronized (interruptLock) {
|
||||
IOUtil.drain(fd0);
|
||||
interruptTriggered = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue