8199791: (se) More Selector cleanup

Reviewed-by: redestad, bpb
This commit is contained in:
Alan Bateman 2018-03-23 14:18:18 +00:00
parent de23920e05
commit 3bb85f5fc5
33 changed files with 754 additions and 1516 deletions

View file

@ -84,17 +84,17 @@ class KQueue {
}
/**
* Returns the file descriptor from a kevent (assuming to be in ident field)
* Returns the file descriptor from a kevent (assuming it is in the ident field)
*/
static int getDescriptor(long address) {
return unsafe.getInt(address + OFFSET_IDENT);
}
static int getFilter(long address) {
static short getFilter(long address) {
return unsafe.getShort(address + OFFSET_FILTER);
}
static int getFlags(long address) {
static short getFlags(long address) {
return unsafe.getShort(address + OFFSET_FLAGS);
}
@ -108,11 +108,11 @@ class KQueue {
private static native int flagsOffset();
static native int kqueue() throws IOException;
static native int create() throws IOException;
static native int keventRegister(int kqpfd, int fd, int filter, int flags);
static native int register(int kqfd, int fd, int filter, int flags);
static native int keventPoll(int kqpfd, long pollAddress, int nevents)
static native int poll(int kqfd, long pollAddress, int nevents, long timeout)
throws IOException;
static {

View file

@ -1,197 +0,0 @@
/*
* Copyright (c) 2011, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* KQueueArrayWrapper.java
* Implementation of Selector using FreeBSD / Mac OS X kqueues
* Derived from Sun's DevPollArrayWrapper
*/
package sun.nio.ch;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import sun.security.action.GetPropertyAction;
/*
* struct kevent { // 32-bit 64-bit
* uintptr_t ident; // 4 8
* short filter; // 2 2
* u_short flags; // 2 2
* u_int fflags; // 4 4
* intptr_t data; // 4 8
* void *udata; // 4 8
* } // Total: 20 32
*
* The implementation works in 32-bit and 64-bit world. We do this by calling a
* native function that actually sets the sizes and offsets of the fields based
* on which mode we're in.
*/
class KQueueArrayWrapper {
// kevent filters
static short EVFILT_READ;
static short EVFILT_WRITE;
// kevent struct
// These fields are now set by initStructSizes in the static initializer.
static short SIZEOF_KEVENT;
static short FD_OFFSET;
static short FILTER_OFFSET;
// kevent array size
static final int NUM_KEVENTS = 128;
// Are we in a 64-bit VM?
static boolean is64bit;
// The kevent array (used for outcoming events only)
private final AllocatedNativeObject keventArray;
private final long keventArrayAddress;
// The kqueue fd
private final int kq;
// The fd of the interrupt line going out
private final int outgoingInterruptFD;
static {
IOUtil.load();
initStructSizes();
String datamodel =
GetPropertyAction.privilegedGetProperty("sun.arch.data.model");
is64bit = "64".equals(datamodel);
}
KQueueArrayWrapper(int fd0, int fd1) throws IOException {
int allocationSize = SIZEOF_KEVENT * NUM_KEVENTS;
keventArray = new AllocatedNativeObject(allocationSize, true);
keventArrayAddress = keventArray.address();
kq = init();
register0(kq, fd0, 1, 0);
outgoingInterruptFD = fd1;
}
// Used to update file description registrations
private static class Update {
SelChImpl channel;
int events;
Update(SelChImpl channel, int events) {
this.channel = channel;
this.events = events;
}
}
private LinkedList<Update> updateList = new LinkedList<Update>();
int getReventOps(int index) {
int result = 0;
int offset = SIZEOF_KEVENT*index + FILTER_OFFSET;
short filter = keventArray.getShort(offset);
// This is all that's necessary based on inspection of usage:
// SinkChannelImpl, SourceChannelImpl, DatagramChannelImpl,
// ServerSocketChannelImpl, SocketChannelImpl
if (filter == EVFILT_READ) {
result |= Net.POLLIN;
} else if (filter == EVFILT_WRITE) {
result |= Net.POLLOUT;
}
return result;
}
int getDescriptor(int index) {
int offset = SIZEOF_KEVENT*index + FD_OFFSET;
/* The ident field is 8 bytes in 64-bit world, however the API wants us
* to return an int. Hence read the 8 bytes but return as an int.
*/
if (is64bit) {
long fd = keventArray.getLong(offset);
assert fd <= Integer.MAX_VALUE;
return (int) fd;
} else {
return keventArray.getInt(offset);
}
}
void setInterest(SelChImpl channel, int events) {
synchronized (updateList) {
// update existing registration
updateList.add(new Update(channel, events));
}
}
void release(SelChImpl channel) {
synchronized (updateList) {
// flush any pending updates
for (Iterator<Update> it = updateList.iterator(); it.hasNext();) {
if (it.next().channel == channel) {
it.remove();
}
}
// remove
register0(kq, channel.getFDVal(), 0, 0);
}
}
void updateRegistrations() {
synchronized (updateList) {
Update u;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
register0(kq, ch.getFDVal(), u.events & Net.POLLIN, u.events & Net.POLLOUT);
}
}
}
void close() throws IOException {
FileDispatcherImpl.closeIntFD(kq);
keventArray.free();
}
int poll(long timeout) {
updateRegistrations();
return kevent0(kq, keventArrayAddress, NUM_KEVENTS, timeout);
}
void interrupt() {
interrupt(outgoingInterruptFD);
}
private native int init();
private static native void initStructSizes();
private native void register0(int kq, int fd, int read, int write);
private native int kevent0(int kq, long keventAddress, int keventCount,
long timeout);
private static native void interrupt(int fd);
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,7 +30,11 @@ import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static sun.nio.ch.KQueue.*;
import static sun.nio.ch.KQueue.EVFILT_READ;
import static sun.nio.ch.KQueue.EVFILT_WRITE;
import static sun.nio.ch.KQueue.EV_ADD;
import static sun.nio.ch.KQueue.EV_ONESHOT;
/**
* AsynchronousChannelGroup implementation based on the BSD kqueue facility.
@ -45,6 +49,9 @@ final class KQueuePort
// kqueue file descriptor
private final int kqfd;
// address of the poll array passed to kqueue_wait
private final long address;
// true if kqueue closed
private boolean closed;
@ -54,9 +61,6 @@ final class KQueuePort
// number of wakeups pending
private final AtomicInteger wakeupCount = new AtomicInteger();
// address of the poll array passed to kqueue_wait
private final long address;
// encapsulates an event for a channel
static class Event {
final PollableChannel channel;
@ -82,28 +86,25 @@ final class KQueuePort
{
super(provider, pool);
// open kqueue
this.kqfd = kqueue();
this.kqfd = KQueue.create();
this.address = KQueue.allocatePollArray(MAX_KEVENTS_TO_POLL);
// create socket pair for wakeup mechanism
int[] sv = new int[2];
try {
socketpair(sv);
// register one end with kqueue
keventRegister(kqfd, sv[0], EVFILT_READ, EV_ADD);
} catch (IOException x) {
close0(kqfd);
throw x;
long fds = IOUtil.makePipe(true);
this.sp = new int[]{(int) (fds >>> 32), (int) fds};
} catch (IOException ioe) {
KQueue.freePollArray(address);
FileDispatcherImpl.closeIntFD(kqfd);
throw ioe;
}
this.sp = sv;
// allocate the poll array
this.address = allocatePollArray(MAX_KEVENTS_TO_POLL);
// register one end with kqueue
KQueue.register(kqfd, sp[0], EVFILT_READ, EV_ADD);
// create the queue and offer the special event to ensure that the first
// threads polls
this.queue = new ArrayBlockingQueue<Event>(MAX_KEVENTS_TO_POLL);
this.queue = new ArrayBlockingQueue<>(MAX_KEVENTS_TO_POLL);
this.queue.offer(NEED_TO_POLL);
}
@ -121,17 +122,18 @@ final class KQueuePort
return;
closed = true;
}
freePollArray(address);
close0(sp[0]);
close0(sp[1]);
close0(kqfd);
try { FileDispatcherImpl.closeIntFD(kqfd); } catch (IOException ioe) { }
try { FileDispatcherImpl.closeIntFD(sp[0]); } catch (IOException ioe) { }
try { FileDispatcherImpl.closeIntFD(sp[1]); } catch (IOException ioe) { }
KQueue.freePollArray(address);
}
private void wakeup() {
if (wakeupCount.incrementAndGet() == 1) {
// write byte to socketpair to force wakeup
try {
interrupt(sp[1]);
IOUtil.write1(sp[1], (byte)0);
} catch (IOException x) {
throw new AssertionError(x);
}
@ -173,9 +175,9 @@ final class KQueuePort
int err = 0;
int flags = (EV_ADD|EV_ONESHOT);
if ((events & Net.POLLIN) > 0)
err = keventRegister(kqfd, fd, EVFILT_READ, flags);
err = KQueue.register(kqfd, fd, EVFILT_READ, flags);
if (err == 0 && (events & Net.POLLOUT) > 0)
err = keventRegister(kqfd, fd, EVFILT_WRITE, flags);
err = KQueue.register(kqfd, fd, EVFILT_WRITE, flags);
if (err != 0)
throw new InternalError("kevent failed: " + err); // should not happen
}
@ -193,7 +195,11 @@ final class KQueuePort
private Event poll() throws IOException {
try {
for (;;) {
int n = keventPoll(kqfd, address, MAX_KEVENTS_TO_POLL);
int n;
do {
n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
} while (n == IOStatus.INTERRUPTED);
/*
* 'n' events have been read. Here we map them to their
* corresponding channel in batch and queue n-1 so that
@ -203,14 +209,14 @@ final class KQueuePort
fdToChannelLock.readLock().lock();
try {
while (n-- > 0) {
long keventAddress = getEvent(address, n);
int fd = getDescriptor(keventAddress);
long keventAddress = KQueue.getEvent(address, n);
int fd = KQueue.getDescriptor(keventAddress);
// wakeup
if (fd == sp[0]) {
if (wakeupCount.decrementAndGet() == 0) {
// no more wakeups so drain pipe
drain1(sp[0]);
IOUtil.drain(sp[0]);
}
// queue special event if there are more events
@ -224,7 +230,7 @@ final class KQueuePort
PollableChannel channel = fdToChannel.get(fd);
if (channel != null) {
int filter = getFilter(keventAddress);
int filter = KQueue.getFilter(keventAddress);
int events = 0;
if (filter == EVFILT_READ)
events = Net.POLLIN;
@ -314,18 +320,4 @@ final class KQueuePort
}
}
}
// -- Native methods --
private static native void socketpair(int[] sv) throws IOException;
private static native void interrupt(int fd) throws IOException;
private static native void drain1(int fd) throws IOException;
private static native void close0(int fd);
static {
IOUtil.load();
}
}

View file

@ -23,11 +23,6 @@
* questions.
*/
/*
* KQueueSelectorImpl.java
* Implementation of Selector using FreeBSD / Mac OS X kqueues
*/
package sun.nio.ch;
import java.io.IOException;
@ -36,85 +31,111 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayDeque;
import java.util.BitSet;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
class KQueueSelectorImpl
extends SelectorImpl
{
// File descriptors used for interrupt
import static sun.nio.ch.KQueue.EVFILT_READ;
import static sun.nio.ch.KQueue.EVFILT_WRITE;
import static sun.nio.ch.KQueue.EV_ADD;
import static sun.nio.ch.KQueue.EV_DELETE;
/**
* KQueue based Selector implementation for macOS
*/
class KQueueSelectorImpl extends SelectorImpl {
// maximum number of events to poll in one call to kqueue
private static final int MAX_KEVENTS = 256;
// kqueue file descriptor
private final int kqfd;
// address of poll array (event list) when polling for pending events
private final long pollArrayAddress;
// file descriptors used for interrupt
private final int fd0;
private final int fd1;
// The kqueue manipulator
private final KQueueArrayWrapper kqueueWrapper;
// maps file descriptor to selection key, synchronize on selector
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
// Map from a file descriptor to an entry containing the selection key
private final HashMap<Integer, MapEntry> fdMap;
// file descriptors registered with kqueue, synchronize on selector
private final BitSet registeredReadFilter = new BitSet();
private final BitSet registeredWriteFilter = new BitSet();
// True if this Selector has been closed
private boolean closed;
// pending new registrations/updates, queued by implRegister and putEventOps
private final Object updateLock = new Object();
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
private final Deque<Integer> updateOps = new ArrayDeque<>();
// Lock for interrupt triggering and clearing
// interrupt triggering and clearing
private final Object interruptLock = new Object();
private boolean interruptTriggered;
// used by updateSelectedKeys to handle cases where the same file
// descriptor is polled by more than one filter
private long updateCount;
private int pollCount;
// Used to map file descriptors to a selection key and "update count"
// (see updateSelectedKeys for usage).
private static class MapEntry {
SelectionKeyImpl ski;
long updateCount;
MapEntry(SelectionKeyImpl ski) {
this.ski = ski;
}
}
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
KQueueSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long fds = IOUtil.makePipe(false);
fd0 = (int)(fds >>> 32);
fd1 = (int)fds;
this.kqfd = KQueue.create();
this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS);
try {
kqueueWrapper = new KQueueArrayWrapper(fd0, fd1);
fdMap = new HashMap<>();
} catch (Throwable t) {
try {
FileDispatcherImpl.closeIntFD(fd0);
} catch (IOException ioe0) {
t.addSuppressed(ioe0);
}
try {
FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException ioe1) {
t.addSuppressed(ioe1);
}
throw t;
long fds = IOUtil.makePipe(false);
this.fd0 = (int) (fds >>> 32);
this.fd1 = (int) fds;
} catch (IOException ioe) {
KQueue.freePollArray(pollArrayAddress);
FileDispatcherImpl.closeIntFD(kqfd);
throw ioe;
}
// register one end of the socket pair for wakeups
KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD);
}
private void ensureOpen() {
if (closed)
if (!isOpen())
throw new ClosedSelectorException();
}
@Override
protected int doSelect(long timeout)
throws IOException
{
ensureOpen();
protected int doSelect(long timeout) throws IOException {
assert Thread.holdsLock(this);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin();
numEntries = kqueueWrapper.poll(timeout);
long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
boolean timedPoll = (to > 0);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally {
end();
}
@ -122,40 +143,101 @@ class KQueueSelectorImpl
return updateSelectedKeys(numEntries);
}
/**
* Process new registrations and changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
SelectionKeyImpl previous = fdToKey.put(fd, ski);
assert previous == null;
assert registeredReadFilter.get(fd) == false;
assert registeredWriteFilter.get(fd) == false;
}
}
// changes to interest ops
assert updateKeys.size() == updateOps.size();
while ((ski = updateKeys.pollFirst()) != null) {
int ops = updateOps.pollFirst();
int fd = ski.channel.getFDVal();
if (ski.isValid() && fdToKey.containsKey(fd)) {
// add or delete interest in read events
if (registeredReadFilter.get(fd)) {
if ((ops & Net.POLLIN) == 0) {
KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
registeredReadFilter.clear(fd);
}
} else if ((ops & Net.POLLIN) != 0) {
KQueue.register(kqfd, fd, EVFILT_READ, EV_ADD);
registeredReadFilter.set(fd);
}
// add or delete interest in write events
if (registeredWriteFilter.get(fd)) {
if ((ops & Net.POLLOUT) == 0) {
KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
registeredWriteFilter.clear(fd);
}
} else if ((ops & Net.POLLOUT) != 0) {
KQueue.register(kqfd, fd, EVFILT_WRITE, EV_ADD);
registeredWriteFilter.set(fd);
}
}
}
}
}
/**
* Update the keys whose fd's have been selected by kqueue.
* Add the ready keys to the selected key set.
* If the interrupt fd has been selected, drain it and clear the interrupt.
*/
private int updateSelectedKeys(int numEntries)
throws IOException
{
private int updateSelectedKeys(int numEntries) throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(nioSelectedKeys());
int numKeysUpdated = 0;
boolean interrupted = false;
// A file descriptor may be registered with kqueue with more than one
// filter and so there may be more than one event for a fd. The update
// count in the MapEntry tracks when the fd was last updated and this
// ensures that the ready ops are updated rather than replaced by a
// second or subsequent event.
updateCount++;
// filter and so there may be more than one event for a fd. The poll
// count is incremented here and compared against the SelectionKey's
// "lastPolled" field. This ensures that the ready ops is updated rather
// than replaced when a file descriptor is polled by both the read and
// write filter.
pollCount++;
for (int i = 0; i < numEntries; i++) {
int nextFD = kqueueWrapper.getDescriptor(i);
if (nextFD == fd0) {
long kevent = KQueue.getEvent(pollArrayAddress, i);
int fd = KQueue.getDescriptor(kevent);
if (fd == fd0) {
interrupted = true;
} else {
MapEntry me = fdMap.get(Integer.valueOf(nextFD));
if (me != null) {
int rOps = kqueueWrapper.getReventOps(i);
SelectionKeyImpl ski = me.ski;
SelectionKeyImpl ski = fdToKey.get(fd);
if (ski != null) {
int rOps = 0;
short filter = KQueue.getFilter(kevent);
if (filter == EVFILT_READ) {
rOps |= Net.POLLIN;
} else if (filter == EVFILT_WRITE) {
rOps |= Net.POLLOUT;
}
if (selectedKeys.contains(ski)) {
// first time this file descriptor has been encountered on this
// update?
if (me.updateCount != updateCount) {
// file descriptor may be polled more than once per poll
if (ski.lastPolled != pollCount) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
me.updateCount = updateCount;
ski.lastPolled = pollCount;
}
} else {
// ready ops have already been set on this update
@ -166,7 +248,7 @@ class KQueueSelectorImpl
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
me.updateCount = updateCount;
ski.lastPolled = pollCount;
}
}
}
@ -181,63 +263,90 @@ class KQueueSelectorImpl
@Override
protected void implClose() throws IOException {
if (!closed) {
closed = true;
assert !isOpen();
assert Thread.holdsLock(this);
assert Thread.holdsLock(nioKeys());
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
kqueueWrapper.close();
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
FileDispatcherImpl.closeIntFD(kqfd);
KQueue.freePollArray(pollArrayAddress);
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
i.remove();
}
FileDispatcherImpl.closeIntFD(fd0);
FileDispatcherImpl.closeIntFD(fd1);
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
i.remove();
}
}
@Override
protected void implRegister(SelectionKeyImpl ski) {
assert Thread.holdsLock(nioKeys());
ensureOpen();
int fd = IOUtil.fdVal(ski.channel.getFD());
fdMap.put(Integer.valueOf(fd), new MapEntry(ski));
synchronized (updateLock) {
newKeys.addLast(ski);
}
keys.add(ski);
}
@Override
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert !ski.isValid();
assert Thread.holdsLock(this);
assert Thread.holdsLock(nioKeys());
assert Thread.holdsLock(nioSelectedKeys());
int fd = ski.channel.getFDVal();
fdMap.remove(Integer.valueOf(fd));
kqueueWrapper.release(ski.channel);
keys.remove(ski);
fdToKey.remove(fd);
if (registeredReadFilter.get(fd)) {
KQueue.register(kqfd, fd, EVFILT_READ, EV_DELETE);
registeredReadFilter.clear(fd);
}
if (registeredWriteFilter.get(fd)) {
KQueue.register(kqfd, fd, EVFILT_WRITE, EV_DELETE);
registeredWriteFilter.clear(fd);
}
selectedKeys.remove(ski);
keys.remove(ski);
// remove from channel's key set
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
((SelChImpl) selch).kill();
}
@Override
public void putEventOps(SelectionKeyImpl ski, int ops) {
ensureOpen();
kqueueWrapper.setInterest(ski.channel, ops);
synchronized (updateLock) {
updateOps.addLast(ops); // ops first in case adding the key fails
updateKeys.addLast(ski);
}
}
@Override
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
kqueueWrapper.interrupt();
try {
IOUtil.write1(fd1, (byte)0);
} catch (IOException ioe) {
throw new InternalError(ioe);
}
interruptTriggered = true;
}
}

View file

@ -23,17 +23,10 @@
* questions.
*/
/*
* KQueueSelectorProvider.java
* Implementation of Selector using FreeBSD / Mac OS X kqueues
* Derived from Sun's DevPollSelectorProvider
*/
package sun.nio.ch;
import java.io.IOException;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.channels.spi.AbstractSelector;
public class KQueueSelectorProvider
extends SelectorProviderImpl