8200257: (se) More Selector cleanup

Reviewed-by: bpb
This commit is contained in:
Alan Bateman 2018-03-30 08:28:09 +01:00
parent 8a1bee438c
commit 34c94079ed
20 changed files with 457 additions and 548 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -63,8 +63,9 @@ class PollArrayWrapper {
}
// Prepare another pollfd struct for use.
void addEntry(int index, SelectionKeyImpl ski) {
void putEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.channel.getFDVal());
putEventOps(index, 0);
}
// Writes the pollfd entry from the source wrapper at the source index

View file

@ -72,10 +72,9 @@ class SinkChannelImpl
sc.configureBlocking(block);
}
public boolean translateReadyOps(int ops, int initialOps,
SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
int intOps = ski.nioInterestOps();
int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@ -83,7 +82,7 @@ class SinkChannelImpl
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@ -91,22 +90,23 @@ class SinkChannelImpl
((intOps & SelectionKey.OP_WRITE) != 0))
newOps |= SelectionKey.OP_WRITE;
sk.nioReadyOps(newOps);
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
return translateReadyOps(ops, 0, ski);
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
public int translateInterestOps(int ops) {
int newOps = 0;
if ((ops & SelectionKey.OP_WRITE) != 0)
ops = Net.POLLOUT;
sk.selector.putEventOps(sk, ops);
newOps |= Net.POLLOUT;
return newOps;
}
public int write(ByteBuffer src) throws IOException {

View file

@ -71,10 +71,9 @@ class SourceChannelImpl
sc.configureBlocking(block);
}
public boolean translateReadyOps(int ops, int initialOps,
SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
int intOps = ski.nioInterestOps();
int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0)
@ -82,7 +81,7 @@ class SourceChannelImpl
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
@ -90,22 +89,23 @@ class SourceChannelImpl
((intOps & SelectionKey.OP_READ) != 0))
newOps |= SelectionKey.OP_READ;
sk.nioReadyOps(newOps);
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
return translateReadyOps(ops, ski.nioReadyOps(), ski);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
return translateReadyOps(ops, 0, ski);
}
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
public int translateInterestOps(int ops) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
ops = Net.POLLIN;
sk.selector.putEventOps(sk, ops);
newOps |= Net.POLLIN;
return newOps;
}
public int read(ByteBuffer dst) throws IOException {

View file

@ -23,23 +23,19 @@
* questions.
*/
/*
*/
package sun.nio.ch;
import java.nio.channels.spi.SelectorProvider;
import java.nio.channels.Selector;
import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.Pipe;
import java.nio.channels.SelectableChannel;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.util.List;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* A multi-threaded implementation of Selector for Windows.
@ -80,9 +76,6 @@ class WindowsSelectorImpl extends SelectorImpl {
// File descriptors corresponding to source and sink
private final int wakeupSourceFd, wakeupSinkFd;
// Lock for close cleanup
private final Object closeLock = new Object();
// Maps file descriptors to their indices in pollArray
private static final class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
@ -103,7 +96,7 @@ class WindowsSelectorImpl extends SelectorImpl {
// class for fdMap entries
private static final class MapEntry {
SelectionKeyImpl ski;
final SelectionKeyImpl ski;
long updateCount = 0;
long clearedCount = 0;
MapEntry(SelectionKeyImpl ski) {
@ -121,6 +114,13 @@ class WindowsSelectorImpl extends SelectorImpl {
private final Object interruptLock = new Object();
private volatile boolean interruptTriggered;
// pending new registrations/updates, queued by implRegister and putEventOps
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateEvents = new ArrayDeque<>();
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
@ -135,11 +135,16 @@ class WindowsSelectorImpl extends SelectorImpl {
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
private void ensureOpen() {
if (!isOpen())
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue();
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
@ -176,6 +181,42 @@ class WindowsSelectorImpl extends SelectorImpl {
return updated;
}
/**
* Process new registrations and changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
assert updateKeys.size() == updateEvents.size();
while ((ski = updateKeys.pollFirst()) != null) {
int events = updateEvents.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
// Helper threads wait on this lock for the next poll.
private final StartLock startLock = new StartLock();
@ -503,46 +544,29 @@ class WindowsSelectorImpl extends SelectorImpl {
@Override
protected void implClose() throws IOException {
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
pollWrapper.free();
pollWrapper = null;
channelArray = null;
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
}
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
@ -561,47 +585,43 @@ class WindowsSelectorImpl extends SelectorImpl {
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
synchronized (closeLock) {
@Override
protected void implDereg(SelectionKeyImpl ski) {
assert !ski.isValid();
assert Thread.holdsLock(this);
if (fdMap.remove(ski) != null) {
int i = ski.getIndex();
assert (i >= 0);
if (i != totalChannels - 1) {
// Copy end one over it
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
pollWrapper.replaceEntry(pollWrapper, totalChannels-1, pollWrapper, i);
}
ski.setIndex(-1);
}
channelArray[totalChannels - 1] = null;
totalChannels--;
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
channelArray[totalChannels - 1] = null;
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
public void putEventOps(SelectionKeyImpl sk, int ops) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
// make sure this sk has not been removed yet
int index = sk.getIndex();
if (index == -1)
throw new CancelledKeyException();
pollWrapper.putEventOps(index, ops);
if (totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
}
}
@Override
public void putEventOps(SelectionKeyImpl ski, int events) {
ensureOpen();
synchronized (updateLock) {
updateEvents.addLast(events); // events first in case adding key fails
updateKeys.addLast(ski);
}
}
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {