8247614: java/nio/channels/DatagramChannel/Connect.java timed out

Reviewed-by: dfuchs, alanb
This commit is contained in:
Conor Cleary 2020-10-29 09:57:09 +00:00 committed by Daniel Fuchs
parent 38574d5169
commit ea26ff1142

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2001, 2020, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -24,7 +24,6 @@
/* @test /* @test
* @bug 4313882 7183800 * @bug 4313882 7183800
* @summary Test DatagramChannel's send and receive methods * @summary Test DatagramChannel's send and receive methods
* @author Mike McCloskey
*/ */
import java.io.*; import java.io.*;
@ -32,7 +31,11 @@ import java.net.*;
import java.nio.*; import java.nio.*;
import java.nio.channels.*; import java.nio.channels.*;
import java.nio.charset.*; import java.nio.charset.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Stream;
public class Connect { public class Connect {
@ -43,124 +46,127 @@ public class Connect {
} }
static void test() throws Exception { static void test() throws Exception {
Reactor r = new Reactor(); ExecutorService threadPool = Executors.newCachedThreadPool();
Actor a = new Actor(r.port()); try (Reactor r = new Reactor();
invoke(a, r); Actor a = new Actor(r.getSocketAddress())
) {
invoke(threadPool, a, r);
} finally {
threadPool.shutdown();
}
} }
static void invoke(Sprintable reader, Sprintable writer) throws Exception { static void invoke(ExecutorService e, Runnable reader, Runnable writer) throws CompletionException {
CompletableFuture<Void> f1 = CompletableFuture.runAsync(writer, e);
Thread writerThread = new Thread(writer); CompletableFuture<Void> f2 = CompletableFuture.runAsync(reader, e);
writerThread.start(); wait(f1, f2);
Thread readerThread = new Thread(reader);
readerThread.start();
writerThread.join();
readerThread.join();
reader.throwException();
writer.throwException();
} }
public interface Sprintable extends Runnable {
public void throwException() throws Exception; // This method waits for either one of the given futures to complete exceptionally
// or for all of the given futures to complete successfully.
private static void wait(CompletableFuture<?>... futures) throws CompletionException {
CompletableFuture<?> future = CompletableFuture.allOf(futures);
Stream.of(futures)
.forEach(f -> f.exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
}));
future.join();
} }
public static class Actor implements Sprintable { public static class Actor implements AutoCloseable, Runnable {
final int port; final SocketAddress socketAddress;
Exception e = null; final DatagramChannel dc;
Actor(int port) { Actor(SocketAddress socketAddress) throws IOException {
this.port = port; this.socketAddress = socketAddress;
} dc = DatagramChannel.open();
public void throwException() throws Exception {
if (e != null)
throw e;
} }
public void run() { public void run() {
try { try {
DatagramChannel dc = DatagramChannel.open();
// Send a message
ByteBuffer bb = ByteBuffer.allocateDirect(256); ByteBuffer bb = ByteBuffer.allocateDirect(256);
bb.put("hello".getBytes()); bb.put("hello".getBytes());
bb.flip(); bb.flip();
InetAddress address = InetAddress.getLocalHost(); dc.connect(socketAddress);
if (address.isLoopbackAddress()) {
address = InetAddress.getLoopbackAddress(); // Send a message
} log.println("Actor attempting to write to Reactor at " + socketAddress.toString());
InetSocketAddress isa = new InetSocketAddress(address, port);
dc.connect(isa);
dc.write(bb); dc.write(bb);
// Try to send to some other address // Try to send to some other address
address = InetAddress.getLocalHost();
InetSocketAddress bogus = new InetSocketAddress(address, 3333);
try { try {
dc.send(bb, bogus); int port = dc.socket().getLocalPort();
throw new RuntimeException("Allowed bogus send while connected"); InetAddress loopback = InetAddress.getLoopbackAddress();
InetSocketAddress otherAddress = new InetSocketAddress(loopback, (port == 3333 ? 3332 : 3333));
log.println("Testing if Actor throws AlreadyConnectedException" + otherAddress.toString());
dc.send(bb, otherAddress);
throw new RuntimeException("Actor allowed send to other address while already connected");
} catch (AlreadyConnectedException ace) { } catch (AlreadyConnectedException ace) {
// Correct behavior // Correct behavior
} }
// Read a reply // Read a reply
bb.flip(); bb.flip();
log.println("Actor waiting to read");
dc.read(bb); dc.read(bb);
bb.flip(); bb.flip();
CharBuffer cb = Charset.forName("US-ASCII"). CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb); newDecoder().decode(bb);
log.println("From Reactor: "+isa+ " said " +cb); log.println("Actor received from Reactor at " + socketAddress + ": " + cb);
// Clean up
dc.disconnect();
dc.close();
} catch (Exception ex) { } catch (Exception ex) {
e = ex; log.println("Actor threw exception: " + ex);
} throw new RuntimeException(ex);
} finally {
log.println("Actor finished");
} }
} }
public static class Reactor implements Sprintable { @Override
public void close() throws IOException {
dc.close();
}
}
public static class Reactor implements AutoCloseable, Runnable {
final DatagramChannel dc; final DatagramChannel dc;
Exception e = null;
Reactor() throws IOException { Reactor() throws IOException {
dc = DatagramChannel.open().bind(new InetSocketAddress(0)); dc = DatagramChannel.open().bind(new InetSocketAddress(0));
} }
int port() { SocketAddress getSocketAddress() throws IOException {
return dc.socket().getLocalPort(); return dc.getLocalAddress();
}
public void throwException() throws Exception {
if (e != null)
throw e;
} }
public void run() { public void run() {
try { try {
// Listen for a message // Listen for a message
ByteBuffer bb = ByteBuffer.allocateDirect(100); ByteBuffer bb = ByteBuffer.allocateDirect(100);
log.println("Reactor waiting to receive");
SocketAddress sa = dc.receive(bb); SocketAddress sa = dc.receive(bb);
bb.flip(); bb.flip();
CharBuffer cb = Charset.forName("US-ASCII"). CharBuffer cb = StandardCharsets.US_ASCII.
newDecoder().decode(bb); newDecoder().decode(bb);
log.println("From Actor: "+sa+ " said " +cb); log.println("Reactor received from Actor at" + sa + ": " + cb);
// Reply to sender // Reply to sender
dc.connect(sa); dc.connect(sa);
bb.flip(); bb.flip();
log.println("Reactor attempting to write: " + dc.getRemoteAddress().toString());
dc.write(bb); dc.write(bb);
// Clean up
dc.disconnect();
dc.close();
} catch (Exception ex) { } catch (Exception ex) {
e = ex; log.println("Reactor threw exception: " + ex);
} throw new RuntimeException(ex);
} finally {
log.println("Reactor finished");
}
}
@Override
public void close() throws IOException {
dc.close();
} }
} }
} }