mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-28 15:24:43 +02:00
8200583: (se) Selector clean-up, part 4
Reviewed-by: bpb, chegar
This commit is contained in:
parent
9968548d57
commit
d185d65b69
11 changed files with 156 additions and 269 deletions
|
@ -55,17 +55,14 @@ class DevPollSelectorImpl
|
|||
// maps file descriptor to selection key, synchronize on selector
|
||||
private final Map<Integer, SelectionKeyImpl> fdToKey = new HashMap<>();
|
||||
|
||||
// pending new registrations/updates, queued by implRegister and putEventOps
|
||||
// pending new registrations/updates, queued by setEventOps
|
||||
private final Object updateLock = new Object();
|
||||
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
|
||||
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
|
||||
private final Deque<Integer> updateEvents = new ArrayDeque<>();
|
||||
|
||||
// interrupt triggering and clearing
|
||||
private final Object interruptLock = new Object();
|
||||
private boolean interruptTriggered;
|
||||
|
||||
|
||||
DevPollSelectorImpl(SelectorProvider sp) throws IOException {
|
||||
super(sp);
|
||||
this.pollWrapper = new DevPollArrayWrapper();
|
||||
|
@ -88,18 +85,34 @@ class DevPollSelectorImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
protected int doSelect(long timeout)
|
||||
throws IOException
|
||||
{
|
||||
protected int doSelect(long timeout) throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
boolean blocking = (timeout != 0);
|
||||
|
||||
long to = timeout;
|
||||
boolean blocking = (to != 0);
|
||||
boolean timedPoll = (to > 0);
|
||||
|
||||
int numEntries;
|
||||
processUpdateQueue();
|
||||
processDeregisterQueue();
|
||||
try {
|
||||
begin(blocking);
|
||||
numEntries = pollWrapper.poll(timeout);
|
||||
|
||||
do {
|
||||
long startTime = timedPoll ? System.nanoTime() : 0;
|
||||
numEntries = pollWrapper.poll(to);
|
||||
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
|
||||
// timed poll interrupted so need to adjust timeout
|
||||
long adjust = System.nanoTime() - startTime;
|
||||
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
|
||||
if (to <= 0) {
|
||||
// timeout expired so no retry
|
||||
numEntries = 0;
|
||||
}
|
||||
}
|
||||
} while (numEntries == IOStatus.INTERRUPTED);
|
||||
assert IOStatus.check(numEntries);
|
||||
|
||||
} finally {
|
||||
end(blocking);
|
||||
}
|
||||
|
@ -108,7 +121,7 @@ class DevPollSelectorImpl
|
|||
}
|
||||
|
||||
/**
|
||||
* Process new registrations and changes to the interest ops.
|
||||
* Process changes to the interest ops.
|
||||
*/
|
||||
private void processUpdateQueue() throws IOException {
|
||||
assert Thread.holdsLock(this);
|
||||
|
@ -116,25 +129,18 @@ class DevPollSelectorImpl
|
|||
synchronized (updateLock) {
|
||||
SelectionKeyImpl ski;
|
||||
|
||||
// new registrations
|
||||
while ((ski = newKeys.pollFirst()) != null) {
|
||||
if (ski.isValid()) {
|
||||
int fd = ski.channel.getFDVal();
|
||||
SelectionKeyImpl previous = fdToKey.put(fd, ski);
|
||||
assert previous == null;
|
||||
assert ski.registeredEvents() == 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Translate the queued updates to changes to the set of monitored
|
||||
// file descriptors. The changes are written to the /dev/poll driver
|
||||
// in bulk.
|
||||
assert updateKeys.size() == updateEvents.size();
|
||||
int index = 0;
|
||||
while ((ski = updateKeys.pollFirst()) != null) {
|
||||
int newEvents = updateEvents.pollFirst();
|
||||
int fd = ski.channel.getFDVal();
|
||||
if (ski.isValid() && fdToKey.containsKey(fd)) {
|
||||
if (ski.isValid()) {
|
||||
int fd = ski.getFDVal();
|
||||
// add to fdToKey if needed
|
||||
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
|
||||
assert (previous == null) || (previous == ski);
|
||||
|
||||
int newEvents = ski.translateInterestOps();
|
||||
int registeredEvents = ski.registeredEvents();
|
||||
if (newEvents != registeredEvents) {
|
||||
if (registeredEvents != 0)
|
||||
|
@ -178,11 +184,11 @@ class DevPollSelectorImpl
|
|||
if (ski != null) {
|
||||
int rOps = pollWrapper.getReventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
|
||||
if (ski.translateAndSetReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.channel.translateAndSetReadyOps(rOps, ski);
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
|
@ -214,20 +220,13 @@ class DevPollSelectorImpl
|
|||
FileDispatcherImpl.closeIntFD(fd1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implRegister(SelectionKeyImpl ski) {
|
||||
ensureOpen();
|
||||
synchronized (updateLock) {
|
||||
newKeys.addLast(ski);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implDereg(SelectionKeyImpl ski) throws IOException {
|
||||
assert !ski.isValid();
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
int fd = ski.channel.getFDVal();
|
||||
int fd = ski.getFDVal();
|
||||
if (fdToKey.remove(fd) != null) {
|
||||
if (ski.registeredEvents() != 0) {
|
||||
pollWrapper.register(fd, POLLREMOVE);
|
||||
|
@ -239,10 +238,9 @@ class DevPollSelectorImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
public void putEventOps(SelectionKeyImpl ski, int events) {
|
||||
public void setEventOps(SelectionKeyImpl ski) {
|
||||
ensureOpen();
|
||||
synchronized (updateLock) {
|
||||
updateEvents.addLast(events); // events first in case adding key fails
|
||||
updateKeys.addLast(ski);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,12 +72,10 @@ class EventPortSelectorImpl
|
|||
// the last update operation, incremented by processUpdateQueue
|
||||
private int lastUpdate;
|
||||
|
||||
// pending new registrations/updates, queued by implRegister, putEventOps,
|
||||
// and updateSelectedKeys
|
||||
// pending new registrations/updates, queued by setEventOps and
|
||||
// updateSelectedKeys
|
||||
private final Object updateLock = new Object();
|
||||
private final Deque<SelectionKeyImpl> newKeys = new ArrayDeque<>();
|
||||
private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
|
||||
private final Deque<Integer> updateEvents = new ArrayDeque<>();
|
||||
|
||||
// interrupt triggering and clearing
|
||||
private final Object interruptLock = new Object();
|
||||
|
@ -146,23 +144,14 @@ class EventPortSelectorImpl
|
|||
|
||||
synchronized (updateLock) {
|
||||
SelectionKeyImpl ski;
|
||||
|
||||
// new registrations
|
||||
while ((ski = newKeys.pollFirst()) != null) {
|
||||
if (ski.isValid()) {
|
||||
int fd = ski.channel.getFDVal();
|
||||
SelectionKeyImpl previous = fdToKey.put(fd, ski);
|
||||
assert previous == null;
|
||||
assert ski.registeredEvents() == 0;
|
||||
}
|
||||
}
|
||||
|
||||
// changes to interest ops
|
||||
assert updateKeys.size() == updateEvents.size();
|
||||
while ((ski = updateKeys.pollFirst()) != null) {
|
||||
int newEvents = updateEvents.pollFirst();
|
||||
int fd = ski.channel.getFDVal();
|
||||
if (ski.isValid() && fdToKey.containsKey(fd)) {
|
||||
if (ski.isValid()) {
|
||||
int fd = ski.getFDVal();
|
||||
// add to fdToKey if needed
|
||||
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
|
||||
assert (previous == null) || (previous == ski);
|
||||
|
||||
int newEvents = ski.translateInterestOps();
|
||||
if (newEvents != ski.registeredEvents()) {
|
||||
if (newEvents == 0) {
|
||||
port_dissociate(pfd, PORT_SOURCE_FD, fd);
|
||||
|
@ -199,22 +188,20 @@ class EventPortSelectorImpl
|
|||
if (ski != null) {
|
||||
int rOps = getEventOps(i);
|
||||
if (selectedKeys.contains(ski)) {
|
||||
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
|
||||
if (ski.translateAndSetReadyOps(rOps)) {
|
||||
numKeysUpdated++;
|
||||
}
|
||||
} else {
|
||||
ski.channel.translateAndSetReadyOps(rOps, ski);
|
||||
ski.translateAndSetReadyOps(rOps);
|
||||
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
|
||||
selectedKeys.add(ski);
|
||||
numKeysUpdated++;
|
||||
}
|
||||
}
|
||||
|
||||
// re-queue key to head so that it is re-associated at
|
||||
// next select (and before other changes)
|
||||
updateEvents.addFirst(ski.registeredEvents());
|
||||
updateKeys.addFirst(ski);
|
||||
// re-queue key so it re-associated at next select
|
||||
ski.registeredEvents(0);
|
||||
updateKeys.addLast(ski);
|
||||
}
|
||||
} else if (source == PORT_SOURCE_USER) {
|
||||
interrupted = true;
|
||||
|
@ -244,20 +231,12 @@ class EventPortSelectorImpl
|
|||
pollArray.free();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implRegister(SelectionKeyImpl ski) {
|
||||
ensureOpen();
|
||||
synchronized (updateLock) {
|
||||
newKeys.addLast(ski);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void implDereg(SelectionKeyImpl ski) throws IOException {
|
||||
assert !ski.isValid();
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
int fd = ski.channel.getFDVal();
|
||||
int fd = ski.getFDVal();
|
||||
if (fdToKey.remove(fd) != null) {
|
||||
if (ski.registeredEvents() != 0) {
|
||||
port_dissociate(pfd, PORT_SOURCE_FD, fd);
|
||||
|
@ -269,10 +248,9 @@ class EventPortSelectorImpl
|
|||
}
|
||||
|
||||
@Override
|
||||
public void putEventOps(SelectionKeyImpl ski, int events) {
|
||||
public void setEventOps(SelectionKeyImpl ski) {
|
||||
ensureOpen();
|
||||
synchronized (updateLock) {
|
||||
updateEvents.addLast(events); // events first in case adding key fails
|
||||
updateKeys.addLast(ski);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,84 +23,20 @@
|
|||
* questions.
|
||||
*/
|
||||
|
||||
#include <sys/devpoll.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include "jni.h"
|
||||
#include "jni_util.h"
|
||||
#include "jvm.h"
|
||||
#include "jlong.h"
|
||||
#include "nio.h"
|
||||
#include "nio_util.h"
|
||||
|
||||
#include "sun_nio_ch_DevPollArrayWrapper.h"
|
||||
#include <poll.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef uint32_t caddr32_t;
|
||||
|
||||
/* /dev/poll ioctl */
|
||||
#define DPIOC (0xD0 << 8)
|
||||
#define DP_POLL (DPIOC | 1) /* poll on fds in cached in /dev/poll */
|
||||
#define DP_ISPOLLED (DPIOC | 2) /* is this fd cached in /dev/poll */
|
||||
#define DEVPOLLSIZE 1000 /* /dev/poll table size increment */
|
||||
#define POLLREMOVE 0x0800 /* Removes fd from monitored set */
|
||||
|
||||
/*
|
||||
* /dev/poll DP_POLL ioctl format
|
||||
*/
|
||||
typedef struct dvpoll {
|
||||
pollfd_t *dp_fds; /* pollfd array */
|
||||
nfds_t dp_nfds; /* num of pollfd's in dp_fds[] */
|
||||
int dp_timeout; /* time out in millisec */
|
||||
} dvpoll_t;
|
||||
|
||||
typedef struct dvpoll32 {
|
||||
caddr32_t dp_fds; /* pollfd array */
|
||||
uint32_t dp_nfds; /* num of pollfd's in dp_fds[] */
|
||||
int32_t dp_timeout; /* time out in millisec */
|
||||
} dvpoll32_t;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#define RESTARTABLE(_cmd, _result) do { \
|
||||
do { \
|
||||
_result = _cmd; \
|
||||
} while((_result == -1) && (errno == EINTR)); \
|
||||
} while(0)
|
||||
|
||||
static int
|
||||
idevpoll(jint wfd, int dpctl, struct dvpoll a)
|
||||
{
|
||||
jlong start, now;
|
||||
int remaining = a.dp_timeout;
|
||||
struct timeval t;
|
||||
int diff;
|
||||
|
||||
gettimeofday(&t, NULL);
|
||||
start = t.tv_sec * 1000 + t.tv_usec / 1000;
|
||||
|
||||
for (;;) {
|
||||
/* poll(7d) ioctl does not return remaining count */
|
||||
int res = ioctl(wfd, dpctl, &a);
|
||||
if (res < 0 && errno == EINTR) {
|
||||
if (remaining >= 0) {
|
||||
gettimeofday(&t, NULL);
|
||||
now = t.tv_sec * 1000 + t.tv_usec / 1000;
|
||||
diff = now - start;
|
||||
remaining -= diff;
|
||||
if (diff < 0 || remaining <= 0) {
|
||||
return 0;
|
||||
}
|
||||
start = now;
|
||||
a.dp_timeout = remaining;
|
||||
}
|
||||
} else {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
JNIEXPORT jint JNICALL
|
||||
Java_sun_nio_ch_DevPollArrayWrapper_init(JNIEnv *env, jobject this)
|
||||
|
@ -153,26 +89,24 @@ Java_sun_nio_ch_DevPollArrayWrapper_registerMultiple(JNIEnv *env, jobject this,
|
|||
|
||||
JNIEXPORT jint JNICALL
|
||||
Java_sun_nio_ch_DevPollArrayWrapper_poll0(JNIEnv *env, jobject this,
|
||||
jlong address, jint numfds,
|
||||
jlong timeout, jint wfd)
|
||||
jlong address, jint numfds,
|
||||
jlong timeout, jint wfd)
|
||||
{
|
||||
struct dvpoll a;
|
||||
void *pfd = (void *) jlong_to_ptr(address);
|
||||
int result = 0;
|
||||
int result;
|
||||
|
||||
a.dp_fds = pfd;
|
||||
a.dp_nfds = numfds;
|
||||
a.dp_timeout = (int)timeout;
|
||||
|
||||
if (timeout <= 0) { /* Indefinite or no wait */
|
||||
RESTARTABLE (ioctl(wfd, DP_POLL, &a), result);
|
||||
} else { /* Bounded wait; bounded restarts */
|
||||
result = idevpoll(wfd, DP_POLL, a);
|
||||
}
|
||||
|
||||
result = ioctl(wfd, DP_POLL, &a);
|
||||
if (result < 0) {
|
||||
JNU_ThrowIOExceptionWithLastError(env, "Error reading driver");
|
||||
return -1;
|
||||
if (errno == EINTR) {
|
||||
return IOS_INTERRUPTED;
|
||||
} else {
|
||||
JNU_ThrowIOExceptionWithLastError(env, "Error reading driver");
|
||||
return IOS_THROWN;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue