mirror of
https://github.com/openjdk/jdk.git
synced 2025-09-22 12:04:39 +02:00
8206001: Enable TLS1.3 by default in Http Client
Reviewed-by: dfuchs
This commit is contained in:
parent
2e9d5e3d8a
commit
5cbda815d0
19 changed files with 383 additions and 72 deletions
|
@ -27,6 +27,7 @@ package jdk.internal.net.http;
|
|||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import javax.net.ssl.SNIHostName;
|
||||
|
@ -89,11 +90,30 @@ abstract class AbstractAsyncSSLConnection extends HttpConnection
|
|||
|
||||
final SSLEngine getEngine() { return engine; }
|
||||
|
||||
private static boolean contains(String[] rr, String target) {
|
||||
for (String s : rr)
|
||||
if (target.equalsIgnoreCase(s))
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
private static SSLParameters createSSLParameters(HttpClientImpl client,
|
||||
ServerName serverName,
|
||||
String[] alpn) {
|
||||
SSLParameters sslp = client.sslParameters();
|
||||
SSLParameters sslParameters = Utils.copySSLParameters(sslp);
|
||||
// filter out unwanted protocols, if h2 only
|
||||
if (alpn != null && alpn.length != 0 && !contains(alpn, "http/1.1")) {
|
||||
ArrayDeque<String> l = new ArrayDeque<>();
|
||||
for (String proto : sslParameters.getProtocols()) {
|
||||
if (!proto.startsWith("SSL") && !proto.endsWith("v1.1") && !proto.endsWith("v1")) {
|
||||
l.add(proto);
|
||||
}
|
||||
}
|
||||
String[] a1 = l.toArray(new String[0]);
|
||||
sslParameters.setProtocols(a1);
|
||||
}
|
||||
|
||||
if (!disableHostnameVerification)
|
||||
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
|
||||
if (alpn != null) {
|
||||
|
@ -112,10 +132,12 @@ abstract class AbstractAsyncSSLConnection extends HttpConnection
|
|||
return sslParameters;
|
||||
}
|
||||
|
||||
|
||||
private static SSLEngine createEngine(SSLContext context, String serverName, int port,
|
||||
SSLParameters sslParameters) {
|
||||
SSLEngine engine = context.createSSLEngine(serverName, port);
|
||||
engine.setUseClientMode(true);
|
||||
|
||||
engine.setSSLParameters(sslParameters);
|
||||
return engine;
|
||||
}
|
||||
|
|
|
@ -316,7 +316,7 @@ final class Exchange<T> {
|
|||
proxyResponse.version, true);
|
||||
return MinimalFuture.completedFuture(syntheticResponse);
|
||||
} else if (t != null) {
|
||||
if (debug.on()) debug.log("checkFor407: no response - %s", t);
|
||||
if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
|
||||
return MinimalFuture.failedFuture(t);
|
||||
} else {
|
||||
if (debug.on()) debug.log("checkFor407: all clear");
|
||||
|
|
|
@ -386,8 +386,11 @@ class Http1AsyncReceiver {
|
|||
// we have a flow List<ByteBuffer> upstream.
|
||||
Http1AsyncDelegateSubscription subscription =
|
||||
new Http1AsyncDelegateSubscription(scheduler, cancel, onSubscriptionError);
|
||||
try {
|
||||
pending.onSubscribe(subscription);
|
||||
} finally {
|
||||
this.delegate = delegate = pending;
|
||||
}
|
||||
final Object captured = delegate;
|
||||
if (debug.on())
|
||||
debug.log("delegate is now " + captured
|
||||
|
@ -485,6 +488,7 @@ class Http1AsyncReceiver {
|
|||
error = ex;
|
||||
}
|
||||
}
|
||||
|
||||
final Throwable t = (recorded == null ? ex : recorded);
|
||||
if (debug.on())
|
||||
debug.log("recorded " + t + "\n\t delegate: " + delegate
|
||||
|
|
|
@ -257,6 +257,14 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
|
|||
.thenCompose(unused -> {
|
||||
CompletableFuture<Void> cf = new MinimalFuture<>();
|
||||
try {
|
||||
asyncReceiver.whenFinished.whenComplete((r,t) -> {
|
||||
if (t != null) {
|
||||
if (debug.on())
|
||||
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
|
||||
if (!headersSentCF.isDone())
|
||||
headersSentCF.completeAsync(() -> this, executor);
|
||||
}
|
||||
});
|
||||
connectFlows(connection);
|
||||
|
||||
if (debug.on()) debug.log("requestAction.headers");
|
||||
|
@ -282,7 +290,8 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
|
|||
|
||||
private void cancelIfFailed(Flow.Subscription s) {
|
||||
asyncReceiver.whenFinished.whenCompleteAsync((r,t) -> {
|
||||
if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
|
||||
if (debug.on())
|
||||
debug.log("asyncReceiver finished (failed=%s)", (Object)t);
|
||||
if (t != null) {
|
||||
s.cancel();
|
||||
// Don't complete exceptionally here as 't'
|
||||
|
|
|
@ -673,7 +673,11 @@ class Http2Connection {
|
|||
client2.deleteConnection(this);
|
||||
List<Stream<?>> c = new LinkedList<>(streams.values());
|
||||
for (Stream<?> s : c) {
|
||||
try {
|
||||
s.connectionClosing(t);
|
||||
} catch (Throwable e) {
|
||||
Log.logError("Failed to close stream {0}: {1}", s.streamid, e);
|
||||
}
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
|
@ -738,6 +742,9 @@ class Http2Connection {
|
|||
}
|
||||
|
||||
if (!(frame instanceof ResetFrame)) {
|
||||
if (frame instanceof DataFrame) {
|
||||
dropDataFrame((DataFrame)frame);
|
||||
}
|
||||
if (isServerInitiatedStream(streamid)) {
|
||||
if (streamid < nextPushStream) {
|
||||
// trailing data on a cancelled push promise stream,
|
||||
|
@ -776,6 +783,27 @@ class Http2Connection {
|
|||
}
|
||||
}
|
||||
|
||||
final void dropDataFrame(DataFrame df) {
|
||||
if (closed) return;
|
||||
if (debug.on()) {
|
||||
debug.log("Dropping data frame for stream %d (%d payload bytes)",
|
||||
df.streamid(), df.payloadLength());
|
||||
}
|
||||
ensureWindowUpdated(df);
|
||||
}
|
||||
|
||||
final void ensureWindowUpdated(DataFrame df) {
|
||||
try {
|
||||
if (closed) return;
|
||||
int length = df.payloadLength();
|
||||
if (length > 0) {
|
||||
windowUpdater.update(length);
|
||||
}
|
||||
} catch(Throwable t) {
|
||||
Log.logError("Unexpected exception while updating window: {0}", (Object)t);
|
||||
}
|
||||
}
|
||||
|
||||
private <T> void handlePushPromise(Stream<T> parent, PushPromiseFrame pp)
|
||||
throws IOException
|
||||
{
|
||||
|
@ -984,7 +1012,6 @@ class Http2Connection {
|
|||
connection.channel().getLocalAddress(),
|
||||
connection.address());
|
||||
SettingsFrame sf = new SettingsFrame(clientSettings);
|
||||
int initialWindowSize = sf.getParameter(INITIAL_WINDOW_SIZE);
|
||||
ByteBuffer buf = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
|
||||
Log.logFrames(sf, "OUT");
|
||||
// send preface bytes and SettingsFrame together
|
||||
|
@ -997,9 +1024,20 @@ class Http2Connection {
|
|||
Log.logTrace("Settings Frame sent");
|
||||
|
||||
// send a Window update for the receive buffer we are using
|
||||
// minus the initial 64 K specified in protocol
|
||||
final int len = windowUpdater.initialWindowSize - initialWindowSize;
|
||||
if (len > 0) {
|
||||
// minus the initial 64 K -1 specified in protocol:
|
||||
// RFC 7540, Section 6.9.2:
|
||||
// "[...] the connection flow-control window is set to the default
|
||||
// initial window size until a WINDOW_UPDATE frame is received."
|
||||
//
|
||||
// Note that the default initial window size, not to be confused
|
||||
// with the initial window size, is defined by RFC 7540 as
|
||||
// 64K -1.
|
||||
final int len = windowUpdater.initialWindowSize - DEFAULT_INITIAL_WINDOW_SIZE;
|
||||
if (len != 0) {
|
||||
if (Log.channel()) {
|
||||
Log.logChannel("Sending initial connection window update frame: {0} ({1} - {2})",
|
||||
len, windowUpdater.initialWindowSize, DEFAULT_INITIAL_WINDOW_SIZE);
|
||||
}
|
||||
windowUpdater.sendWindowUpdate(len);
|
||||
}
|
||||
// there will be an ACK to the windows update - which should
|
||||
|
@ -1132,6 +1170,7 @@ class Http2Connection {
|
|||
|
||||
private Stream<?> registerNewStream(OutgoingHeaders<Stream<?>> oh) {
|
||||
Stream<?> stream = oh.getAttachment();
|
||||
assert stream.streamid == 0;
|
||||
int streamid = nextstreamid;
|
||||
nextstreamid += 2;
|
||||
stream.registerStream(streamid);
|
||||
|
|
|
@ -329,6 +329,17 @@ final class HttpClientImpl extends HttpClient implements Trackable {
|
|||
|
||||
private static SSLParameters getDefaultParams(SSLContext ctx) {
|
||||
SSLParameters params = ctx.getSupportedSSLParameters();
|
||||
String[] protocols = params.getProtocols();
|
||||
boolean found13 = false;
|
||||
for (String proto : protocols) {
|
||||
if (proto.equals("TLSv1.3")) {
|
||||
found13 = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found13)
|
||||
params.setProtocols(new String[] {"TLSv1.3", "TLSv1.2"});
|
||||
else
|
||||
params.setProtocols(new String[] {"TLSv1.2"});
|
||||
return params;
|
||||
}
|
||||
|
|
|
@ -360,7 +360,6 @@ final class SocketTube implements FlowTube {
|
|||
}
|
||||
} catch (Throwable t) {
|
||||
signalError(t);
|
||||
subscription.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -424,6 +423,8 @@ final class SocketTube implements FlowTube {
|
|||
}
|
||||
completed = true;
|
||||
readPublisher.signalError(error);
|
||||
Flow.Subscription subscription = this.subscription;
|
||||
if (subscription != null) subscription.cancel();
|
||||
}
|
||||
|
||||
// A repeatable WriteEvent which is paused after firing and can
|
||||
|
@ -468,7 +469,11 @@ final class SocketTube implements FlowTube {
|
|||
|
||||
@Override
|
||||
public void cancel() {
|
||||
if (cancelled) return;
|
||||
if (debug.on()) debug.log("write: cancel");
|
||||
if (Log.channel()) {
|
||||
Log.logChannel("Cancelling write subscription");
|
||||
}
|
||||
dropSubscription();
|
||||
upstreamSubscription.cancel();
|
||||
}
|
||||
|
@ -503,9 +508,7 @@ final class SocketTube implements FlowTube {
|
|||
} catch (Throwable t) {
|
||||
if (debug.on())
|
||||
debug.log("write: error while requesting more: " + t);
|
||||
cancelled = true;
|
||||
signalError(t);
|
||||
subscription.cancel();
|
||||
} finally {
|
||||
debugState("leaving requestMore: ");
|
||||
}
|
||||
|
|
|
@ -185,6 +185,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
int size = Utils.remaining(dsts, Integer.MAX_VALUE);
|
||||
if (size == 0 && finished) {
|
||||
inputQ.remove();
|
||||
connection.ensureWindowUpdated(df); // must update connection window
|
||||
Log.logTrace("responseSubscriber.onComplete");
|
||||
if (debug.on()) debug.log("incoming: onComplete");
|
||||
sched.stop();
|
||||
|
@ -197,7 +198,12 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
inputQ.remove();
|
||||
Log.logTrace("responseSubscriber.onNext {0}", size);
|
||||
if (debug.on()) debug.log("incoming: onNext(%d)", size);
|
||||
try {
|
||||
subscriber.onNext(dsts);
|
||||
} catch (Throwable t) {
|
||||
connection.dropDataFrame(df); // must update connection window
|
||||
throw t;
|
||||
}
|
||||
if (consumed(df)) {
|
||||
Log.logTrace("responseSubscriber.onComplete");
|
||||
if (debug.on()) debug.log("incoming: onComplete");
|
||||
|
@ -215,6 +221,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
}
|
||||
} catch (Throwable throwable) {
|
||||
errorRef.compareAndSet(null, throwable);
|
||||
} finally {
|
||||
if (sched.isStopped()) drainInputQueue();
|
||||
}
|
||||
|
||||
Throwable t = errorRef.get();
|
||||
|
@ -233,10 +241,25 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
Log.logError("Subscriber::onError threw exception: {0}", (Object) t);
|
||||
} finally {
|
||||
cancelImpl(t);
|
||||
drainInputQueue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// must only be called from the scheduler schedule() loop.
|
||||
// ensure that all received data frames are accounted for
|
||||
// in the connection window flow control if the scheduler
|
||||
// is stopped before all the data is consumed.
|
||||
private void drainInputQueue() {
|
||||
Http2Frame frame;
|
||||
while ((frame = inputQ.poll()) != null) {
|
||||
if (frame instanceof DataFrame) {
|
||||
connection.dropDataFrame((DataFrame)frame);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Callback invoked after the Response BodySubscriber has consumed the
|
||||
// buffers contained in a DataFrame.
|
||||
// Returns true if END_STREAM is reached, false otherwise.
|
||||
|
@ -245,15 +268,19 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
// The entire DATA frame payload is included in flow control,
|
||||
// including the Pad Length and Padding fields if present
|
||||
int len = df.payloadLength();
|
||||
boolean endStream = df.getFlag(DataFrame.END_STREAM);
|
||||
if (len == 0) return endStream;
|
||||
|
||||
connection.windowUpdater.update(len);
|
||||
|
||||
if (!df.getFlag(DataFrame.END_STREAM)) {
|
||||
if (!endStream) {
|
||||
// Don't send window update on a stream which is
|
||||
// closed or half closed.
|
||||
windowUpdater.update(len);
|
||||
return false; // more data coming
|
||||
}
|
||||
return true; // end of stream
|
||||
|
||||
// true: end of stream; false: more data coming
|
||||
return endStream;
|
||||
}
|
||||
|
||||
boolean deRegister() {
|
||||
|
@ -500,8 +527,8 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
{
|
||||
int amount = frame.getUpdate();
|
||||
if (amount <= 0) {
|
||||
Log.logTrace("Resetting stream: {0} %d, Window Update amount: %d\n",
|
||||
streamid, streamid, amount);
|
||||
Log.logTrace("Resetting stream: {0}, Window Update amount: {1}",
|
||||
streamid, amount);
|
||||
connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
|
||||
} else {
|
||||
assert streamid != 0;
|
||||
|
@ -1126,7 +1153,7 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
connection.resetStream(streamid, ResetFrame.CANCEL);
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
} catch (Throwable ex) {
|
||||
Log.logError(ex);
|
||||
}
|
||||
}
|
||||
|
@ -1289,6 +1316,18 @@ class Stream<T> extends ExchangeImpl<T> {
|
|||
int getStreamId() {
|
||||
return streamid;
|
||||
}
|
||||
|
||||
@Override
|
||||
String dbgString() {
|
||||
String dbg = dbgString;
|
||||
if (dbg != null) return dbg;
|
||||
if (streamid == 0) {
|
||||
return connection.dbgString() + ":WindowUpdateSender(stream: ?)";
|
||||
} else {
|
||||
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamid + ")";
|
||||
return dbgString = dbg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
|
||||
package jdk.internal.net.http;
|
||||
|
||||
import jdk.internal.net.http.common.FlowTube;
|
||||
import jdk.internal.net.http.common.Logger;
|
||||
import jdk.internal.net.http.frame.SettingsFrame;
|
||||
import jdk.internal.net.http.frame.WindowUpdateFrame;
|
||||
|
@ -66,8 +67,9 @@ abstract class WindowUpdateSender {
|
|||
abstract int getStreamId();
|
||||
|
||||
void update(int delta) {
|
||||
if (debug.on()) debug.log("update: %d", delta);
|
||||
if (received.addAndGet(delta) > limit) {
|
||||
int rcv = received.addAndGet(delta);
|
||||
if (debug.on()) debug.log("update: %d, received: %d, limit: %d", delta, rcv, limit);
|
||||
if (rcv > limit) {
|
||||
synchronized (this) {
|
||||
int tosend = received.get();
|
||||
if( tosend > limit) {
|
||||
|
@ -83,8 +85,18 @@ abstract class WindowUpdateSender {
|
|||
connection.sendUnorderedFrame(new WindowUpdateFrame(getStreamId(), delta));
|
||||
}
|
||||
|
||||
volatile String dbgString;
|
||||
String dbgString() {
|
||||
String dbg = dbgString;
|
||||
if (dbg != null) return dbg;
|
||||
FlowTube tube = connection.connection.getConnectionFlow();
|
||||
if (tube == null) {
|
||||
return "WindowUpdateSender(stream: " + getStreamId() + ")";
|
||||
} else {
|
||||
int streamId = getStreamId();
|
||||
dbg = connection.dbgString() + ":WindowUpdateSender(stream: " + streamId + ")";
|
||||
return streamId == 0 ? dbg : (dbgString = dbg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,6 +33,9 @@ import javax.net.ssl.SSLEngineResult.HandshakeStatus;
|
|||
import javax.net.ssl.SSLEngineResult.Status;
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.IOException;
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -93,6 +96,8 @@ public class SSLFlowDelegate {
|
|||
// When handshake is in progress trying to wrap may produce no bytes.
|
||||
private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
|
||||
private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
|
||||
private static final boolean isMonitored =
|
||||
monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"));
|
||||
|
||||
final Executor exec;
|
||||
final Reader reader;
|
||||
|
@ -100,6 +105,7 @@ public class SSLFlowDelegate {
|
|||
final SSLEngine engine;
|
||||
final String tubeName; // hack
|
||||
final CompletableFuture<String> alpnCF; // completes on initial handshake
|
||||
final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped
|
||||
volatile boolean close_notify_received;
|
||||
final CompletableFuture<Void> readerCF;
|
||||
final CompletableFuture<Void> writerCF;
|
||||
|
@ -152,8 +158,7 @@ public class SSLFlowDelegate {
|
|||
// Writer to the downWriter.
|
||||
connect(downReader, downWriter);
|
||||
|
||||
if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
|
||||
Monitor.add(this::monitor);
|
||||
if (isMonitored) Monitor.add(monitor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -202,6 +207,7 @@ public class SSLFlowDelegate {
|
|||
public String monitor() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("SSL: id ").append(id);
|
||||
sb.append(" ").append(dbgString());
|
||||
sb.append(" HS state: " + states(handshakeState));
|
||||
sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
|
||||
if (stateList != null) {
|
||||
|
@ -293,8 +299,10 @@ public class SSLFlowDelegate {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "READER: " + super.toString() + " readBuf: " + readBuf.toString()
|
||||
+ " count: " + count.toString();
|
||||
return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
|
||||
+ ", count: " + count.toString() + ", scheduler: "
|
||||
+ (scheduler.isStopped() ? "stopped" : "running")
|
||||
+ ", status: " + lastUnwrapStatus;
|
||||
}
|
||||
|
||||
private void reallocReadBuf() {
|
||||
|
@ -335,6 +343,7 @@ public class SSLFlowDelegate {
|
|||
}
|
||||
if (complete) {
|
||||
this.completing = complete;
|
||||
minBytesRequired = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -395,13 +404,23 @@ public class SSLFlowDelegate {
|
|||
// not enough data in the read buffer...
|
||||
// no need to try to unwrap again unless we get more bytes
|
||||
// than minBytesRequired = len in the read buffer.
|
||||
minBytesRequired = len;
|
||||
synchronized (readBufferLock) {
|
||||
minBytesRequired = len;
|
||||
// more bytes could already have been added...
|
||||
assert readBuf.remaining() >= len;
|
||||
// check if we have received some data, and if so
|
||||
// we can just re-spin the loop
|
||||
if (readBuf.remaining() > len) continue;
|
||||
else if (this.completing) {
|
||||
if (debug.on()) {
|
||||
debugr.log("BUFFER_UNDERFLOW with EOF," +
|
||||
" %d bytes non decrypted.", len);
|
||||
}
|
||||
// The channel won't send us any more data, and
|
||||
// we are in underflow: we need to fail.
|
||||
throw new IOException("BUFFER_UNDERFLOW with EOF, "
|
||||
+ len + " bytes non decrypted.");
|
||||
}
|
||||
}
|
||||
// request more data and return.
|
||||
requestMore();
|
||||
|
@ -429,6 +448,7 @@ public class SSLFlowDelegate {
|
|||
} catch (IOException ex) {
|
||||
errorCommon(ex);
|
||||
handleError(ex);
|
||||
return;
|
||||
}
|
||||
if (handshaking && !complete)
|
||||
return;
|
||||
|
@ -452,12 +472,13 @@ public class SSLFlowDelegate {
|
|||
}
|
||||
}
|
||||
|
||||
private volatile Status lastUnwrapStatus;
|
||||
EngineResult unwrapBuffer(ByteBuffer src) throws IOException {
|
||||
ByteBuffer dst = getAppBuffer();
|
||||
int len = src.remaining();
|
||||
while (true) {
|
||||
SSLEngineResult sslResult = engine.unwrap(src, dst);
|
||||
switch (sslResult.getStatus()) {
|
||||
switch (lastUnwrapStatus = sslResult.getStatus()) {
|
||||
case BUFFER_OVERFLOW:
|
||||
// may happen if app size buffer was changed, or if
|
||||
// our 'adaptiveBufferSize' guess was too small for
|
||||
|
@ -507,7 +528,9 @@ public class SSLFlowDelegate {
|
|||
}
|
||||
|
||||
public static class Monitor extends Thread {
|
||||
final List<Monitorable> list;
|
||||
final List<WeakReference<Monitorable>> list;
|
||||
final List<FinalMonitorable> finalList;
|
||||
final ReferenceQueue<Monitorable> queue = new ReferenceQueue<>();
|
||||
static Monitor themon;
|
||||
|
||||
static {
|
||||
|
@ -515,19 +538,61 @@ public class SSLFlowDelegate {
|
|||
themon.start(); // uncomment to enable Monitor
|
||||
}
|
||||
|
||||
// An instance used to temporarily store the
|
||||
// last observable state of a monitorable object.
|
||||
// When Monitor.remove(o) is called, we replace
|
||||
// 'o' with a FinalMonitorable whose reference
|
||||
// will be enqueued after the last observable state
|
||||
// has been printed.
|
||||
final class FinalMonitorable implements Monitorable {
|
||||
final String finalState;
|
||||
FinalMonitorable(Monitorable o) {
|
||||
finalState = o.getInfo();
|
||||
finalList.add(this);
|
||||
}
|
||||
@Override
|
||||
public String getInfo() {
|
||||
finalList.remove(this);
|
||||
return finalState;
|
||||
}
|
||||
}
|
||||
|
||||
Monitor() {
|
||||
super("Monitor");
|
||||
setDaemon(true);
|
||||
list = Collections.synchronizedList(new LinkedList<>());
|
||||
finalList = new ArrayList<>(); // access is synchronized on list above
|
||||
}
|
||||
|
||||
void addTarget(Monitorable o) {
|
||||
list.add(o);
|
||||
list.add(new WeakReference<>(o, queue));
|
||||
}
|
||||
void removeTarget(Monitorable o) {
|
||||
// It can take a long time for GC to clean up references.
|
||||
// Calling Monitor.remove() early helps removing noise from the
|
||||
// logs/
|
||||
synchronized (list) {
|
||||
Iterator<WeakReference<Monitorable>> it = list.iterator();
|
||||
while (it.hasNext()) {
|
||||
Monitorable m = it.next().get();
|
||||
if (m == null) it.remove();
|
||||
if (o == m) {
|
||||
it.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
FinalMonitorable m = new FinalMonitorable(o);
|
||||
addTarget(m);
|
||||
Reference.reachabilityFence(m);
|
||||
}
|
||||
}
|
||||
|
||||
public static void add(Monitorable o) {
|
||||
themon.addTarget(o);
|
||||
}
|
||||
public static void remove(Monitorable o) {
|
||||
themon.removeTarget(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -536,7 +601,14 @@ public class SSLFlowDelegate {
|
|||
while (true) {
|
||||
Thread.sleep(20 * 1000);
|
||||
synchronized (list) {
|
||||
for (Monitorable o : list) {
|
||||
Reference<? extends Monitorable> expired;
|
||||
while ((expired = queue.poll()) != null) list.remove(expired);
|
||||
for (WeakReference<Monitorable> ref : list) {
|
||||
Monitorable o = ref.get();
|
||||
if (o == null) continue;
|
||||
if (o instanceof FinalMonitorable) {
|
||||
ref.enqueue();
|
||||
}
|
||||
System.out.println(o.getInfo());
|
||||
System.out.println("-------------------------");
|
||||
}
|
||||
|
@ -733,6 +805,7 @@ public class SSLFlowDelegate {
|
|||
// downstream. Otherwise, we send the writeBuffer downstream
|
||||
// and will allocate a new one next time.
|
||||
volatile ByteBuffer writeBuffer;
|
||||
private volatile Status lastWrappedStatus;
|
||||
@SuppressWarnings("fallthrough")
|
||||
EngineResult wrapBuffers(ByteBuffer[] src) throws SSLException {
|
||||
long len = Utils.remaining(src);
|
||||
|
@ -747,7 +820,7 @@ public class SSLFlowDelegate {
|
|||
while (true) {
|
||||
SSLEngineResult sslResult = engine.wrap(src, dst);
|
||||
if (debugw.on()) debugw.log("SSLResult: " + sslResult);
|
||||
switch (sslResult.getStatus()) {
|
||||
switch (lastWrappedStatus = sslResult.getStatus()) {
|
||||
case BUFFER_OVERFLOW:
|
||||
// Shouldn't happen. We allocated buffer with packet size
|
||||
// get it again if net buffer size was changed
|
||||
|
@ -815,8 +888,10 @@ public class SSLFlowDelegate {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "WRITER: " + super.toString() +
|
||||
" writeList size " + Integer.toString(writeList.size());
|
||||
return "WRITER: " + super.toString()
|
||||
+ ", writeList size: " + Integer.toString(writeList.size())
|
||||
+ ", scheduler: " + (scheduler.isStopped() ? "stopped" : "running")
|
||||
+ ", status: " + lastWrappedStatus;
|
||||
//" writeList: " + writeList.toString();
|
||||
}
|
||||
}
|
||||
|
@ -839,6 +914,7 @@ public class SSLFlowDelegate {
|
|||
stopped = true;
|
||||
reader.stop();
|
||||
writer.stop();
|
||||
if (isMonitored) Monitor.remove(monitor);
|
||||
}
|
||||
|
||||
private Void stopOnError(Throwable currentlyUnused) {
|
||||
|
@ -953,6 +1029,10 @@ public class SSLFlowDelegate {
|
|||
case NEED_UNWRAP_AGAIN:
|
||||
// do nothing else
|
||||
// receiving-side data will trigger unwrap
|
||||
if (caller == WRITER) {
|
||||
reader.schedule();
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new InternalError("Unexpected handshake status:"
|
||||
|
|
|
@ -406,6 +406,21 @@ public class SSLTube implements FlowTube {
|
|||
}
|
||||
}
|
||||
|
||||
private void complete(DelegateWrapper subscriberImpl, Throwable t) {
|
||||
try {
|
||||
if (t == null) subscriberImpl.onComplete();
|
||||
else subscriberImpl.onError(t);
|
||||
if (debug.on()) {
|
||||
debug.log("subscriber completed %s"
|
||||
+ ((t == null) ? "normally" : ("with error: " + t)));
|
||||
}
|
||||
} finally {
|
||||
// Error or EOF while reading:
|
||||
// cancel write side after completing read side
|
||||
writeSubscription.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
private void onNewSubscription(DelegateWrapper subscriberImpl,
|
||||
Flow.Subscription subscription) {
|
||||
assert subscriberImpl != null;
|
||||
|
@ -432,13 +447,13 @@ public class SSLTube implements FlowTube {
|
|||
if (debug.on())
|
||||
debug.log("onNewSubscription: subscriberImpl:%s, invoking onError:%s",
|
||||
subscriberImpl, failed);
|
||||
subscriberImpl.onError(failed);
|
||||
complete(subscriberImpl, failed);
|
||||
} else if (completed) {
|
||||
if (debug.on())
|
||||
debug.log("onNewSubscription: subscriberImpl:%s, invoking onCompleted",
|
||||
subscriberImpl);
|
||||
finished = true;
|
||||
subscriberImpl.onComplete();
|
||||
complete(subscriberImpl, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -463,7 +478,7 @@ public class SSLTube implements FlowTube {
|
|||
subscriberImpl = subscribed;
|
||||
}
|
||||
if (subscriberImpl != null) {
|
||||
subscriberImpl.onError(failed);
|
||||
complete(subscriberImpl, failed);
|
||||
} else {
|
||||
if (debug.on())
|
||||
debug.log("%s: delegate null, stored %s", this, failed);
|
||||
|
@ -485,14 +500,22 @@ public class SSLTube implements FlowTube {
|
|||
return !(hs == NOT_HANDSHAKING || hs == FINISHED);
|
||||
}
|
||||
|
||||
private boolean handshakeFailed() {
|
||||
private String handshakeFailed() {
|
||||
// sslDelegate can be null if we reach here
|
||||
// during the initial handshake, as that happens
|
||||
// within the SSLFlowDelegate constructor.
|
||||
// In that case we will want to raise an exception.
|
||||
return handshaking()
|
||||
if (handshaking()
|
||||
&& (sslDelegate == null
|
||||
|| !sslDelegate.closeNotifyReceived());
|
||||
|| !sslDelegate.closeNotifyReceived())) {
|
||||
return "Remote host terminated the handshake";
|
||||
}
|
||||
// The initial handshake may not have been started yet.
|
||||
// In which case - if we are completed before the initial handshake
|
||||
// is started, we consider this a handshake failure as well.
|
||||
if ("SSL_NULL_WITH_NULL_NULL".equals(engine.getSession().getCipherSuite()))
|
||||
return "Remote host closed the channel";
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -503,17 +526,18 @@ public class SSLTube implements FlowTube {
|
|||
subscriberImpl = subscribed;
|
||||
}
|
||||
|
||||
if (handshakeFailed()) {
|
||||
String handshakeFailed = handshakeFailed();
|
||||
if (handshakeFailed != null) {
|
||||
if (debug.on())
|
||||
debug.log("handshake: %s, inbound done: %s outbound done: %s",
|
||||
debug.log("handshake: %s, inbound done: %s, outbound done: %s: %s",
|
||||
engine.getHandshakeStatus(),
|
||||
engine.isInboundDone(),
|
||||
engine.isOutboundDone());
|
||||
onErrorImpl(new SSLHandshakeException(
|
||||
"Remote host terminated the handshake"));
|
||||
engine.isOutboundDone(),
|
||||
handshakeFailed);
|
||||
onErrorImpl(new SSLHandshakeException(handshakeFailed));
|
||||
} else if (subscriberImpl != null) {
|
||||
onCompleteReceived = finished = true;
|
||||
subscriberImpl.onComplete();
|
||||
complete(subscriberImpl, null);
|
||||
} else {
|
||||
onCompleteReceived = true;
|
||||
}
|
||||
|
|
|
@ -161,14 +161,19 @@ public class SettingsFrame extends Http2Frame {
|
|||
}
|
||||
}
|
||||
|
||||
public static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * K -1;
|
||||
public static final int DEFAULT_HEADER_TABLE_SIZE = 4 * K;
|
||||
public static final int DEFAULT_MAX_CONCURRENT_STREAMS = 100;
|
||||
public static final int DEFAULT_MAX_FRAME_SIZE = 16 * K;
|
||||
|
||||
public static SettingsFrame getDefaultSettings() {
|
||||
SettingsFrame f = new SettingsFrame();
|
||||
// TODO: check these values
|
||||
f.setParameter(ENABLE_PUSH, 1);
|
||||
f.setParameter(HEADER_TABLE_SIZE, 4 * K);
|
||||
f.setParameter(MAX_CONCURRENT_STREAMS, 100);
|
||||
f.setParameter(INITIAL_WINDOW_SIZE, 64 * K - 1);
|
||||
f.setParameter(MAX_FRAME_SIZE, 16 * K);
|
||||
f.setParameter(HEADER_TABLE_SIZE, DEFAULT_HEADER_TABLE_SIZE);
|
||||
f.setParameter(MAX_CONCURRENT_STREAMS, DEFAULT_MAX_CONCURRENT_STREAMS);
|
||||
f.setParameter(INITIAL_WINDOW_SIZE, DEFAULT_INITIAL_WINDOW_SIZE);
|
||||
f.setParameter(MAX_FRAME_SIZE, DEFAULT_MAX_FRAME_SIZE);
|
||||
return f;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -339,9 +339,11 @@ public class CancelledResponse {
|
|||
Thread.sleep(10);
|
||||
}
|
||||
out.println("sent " + s);
|
||||
} catch (SSLException | SocketException x) {
|
||||
// if SSL then we might get a "Broken Pipe", otherwise
|
||||
// a "Socket closed".
|
||||
} catch (SSLException | SocketException | RuntimeException x) {
|
||||
// if SSL then we might get a "Broken Pipe", or a
|
||||
// RuntimeException wrapping an InvalidAlgorithmParameterException
|
||||
// (probably if the channel is closed during the handshake),
|
||||
// otherwise we get a "Socket closed".
|
||||
boolean expected = cancelled.get();
|
||||
if (sent > 0 && expected) {
|
||||
System.out.println("Connection closed by peer as expected: " + x);
|
||||
|
@ -349,6 +351,7 @@ public class CancelledResponse {
|
|||
} else {
|
||||
System.out.println("Unexpected exception (sent="
|
||||
+ sent + ", cancelled=" + expected + "): " + x);
|
||||
if (x instanceof RuntimeException) throw (RuntimeException) x;
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
} catch (IOException | InterruptedException e) {
|
||||
|
|
|
@ -284,7 +284,7 @@ public class MockServer extends Thread implements Closeable {
|
|||
}
|
||||
try {
|
||||
socket.close();
|
||||
} catch (IOException e) {}
|
||||
} catch (Throwable e) {}
|
||||
synchronized (removals) {
|
||||
removals.add(this);
|
||||
}
|
||||
|
@ -339,7 +339,7 @@ public class MockServer extends Thread implements Closeable {
|
|||
closed = true;
|
||||
try {
|
||||
ss.close();
|
||||
} catch (IOException e) {
|
||||
} catch (Throwable e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
for (Connection c : sockets) {
|
||||
|
|
|
@ -264,19 +264,47 @@ public class ShortResponseBody {
|
|||
|
||||
// can be used to prolong request body publication
|
||||
static final class InfiniteInputStream extends InputStream {
|
||||
int count = 0;
|
||||
int k16 = 0;
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
if (++count == 1) {
|
||||
System.out.println("Start sending 1 byte");
|
||||
}
|
||||
if (count > 16 * 1024) {
|
||||
k16++;
|
||||
System.out.println("... 16K sent.");
|
||||
count = count % (16 * 1024);
|
||||
}
|
||||
if (k16 > 128) {
|
||||
System.out.println("WARNING: InfiniteInputStream: " +
|
||||
"more than 128 16k buffers generated: returning EOF");
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int offset, int length) {
|
||||
//int count = offset;
|
||||
//length = Math.max(0, Math.min(buf.length - offset, length));
|
||||
length = Math.max(0, Math.min(buf.length - offset, length));
|
||||
//for (; count < length; count++)
|
||||
// buf[offset++] = 0x01;
|
||||
//return count;
|
||||
return Math.max(0, Math.min(buf.length - offset, length));
|
||||
if (count == 0) {
|
||||
System.out.println("Start sending " + length);
|
||||
} else if (count > 16 * 1024) {
|
||||
k16++;
|
||||
System.out.println("... 16K sent.");
|
||||
count = count % (16 * 1024);
|
||||
}
|
||||
if (k16 > 128) {
|
||||
System.out.println("WARNING: InfiniteInputStream: " +
|
||||
"more than 128 16k buffers generated: returning EOF");
|
||||
return -1;
|
||||
}
|
||||
count += length;
|
||||
return length;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -493,10 +521,13 @@ public class ShortResponseBody {
|
|||
out.print(requestMethod + " ");
|
||||
URI uriPath = readRequestPath(is);
|
||||
out.println(uriPath);
|
||||
readRequestHeaders(is);
|
||||
String headers = readRequestHeaders(is);
|
||||
|
||||
String query = uriPath.getRawQuery();
|
||||
assert query != null;
|
||||
if (query == null) {
|
||||
out.println("Request headers: [" + headers + "]");
|
||||
}
|
||||
assert query != null : "null query for uriPath: " + uriPath;
|
||||
String qv = query.split("=")[1];
|
||||
int len;
|
||||
if (qv.equals("all")) {
|
||||
|
@ -542,9 +573,11 @@ public class ShortResponseBody {
|
|||
}
|
||||
|
||||
// Read until the end of a HTTP request headers
|
||||
static void readRequestHeaders(InputStream is) throws IOException {
|
||||
static String readRequestHeaders(InputStream is) throws IOException {
|
||||
int requestEndCount = 0, r;
|
||||
StringBuilder sb = new StringBuilder();
|
||||
while ((r = is.read()) != -1) {
|
||||
sb.append((char) r);
|
||||
if (r == requestEnd[requestEndCount]) {
|
||||
requestEndCount++;
|
||||
if (requestEndCount == 4) {
|
||||
|
@ -554,6 +587,7 @@ public class ShortResponseBody {
|
|||
requestEndCount = 0;
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.util.List;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ServerSocketFactory;
|
||||
import javax.net.ssl.SSLException;
|
||||
import javax.net.ssl.SSLServerSocketFactory;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.HttpClient.Version;
|
||||
|
@ -268,7 +269,7 @@ public class SplitResponse {
|
|||
String onechar = s.substring(i, i + 1);
|
||||
try {
|
||||
conn.send(onechar);
|
||||
} catch(SocketException x) {
|
||||
} catch(SocketException | SSLException x) {
|
||||
if (!useSSL || i != len - 1) throw x;
|
||||
if (x.getMessage().contains("closed by remote host")) {
|
||||
String osname = System.getProperty("os.name", "unknown");
|
||||
|
|
|
@ -177,13 +177,24 @@ public class FixedThreadPoolTest {
|
|||
System.err.println("DONE");
|
||||
}
|
||||
|
||||
// expect highest supported version we know about
|
||||
static String expectedTLSVersion(SSLContext ctx) {
|
||||
SSLParameters params = ctx.getSupportedSSLParameters();
|
||||
String[] protocols = params.getProtocols();
|
||||
for (String prot : protocols) {
|
||||
if (prot.equals("TLSv1.3"))
|
||||
return "TLSv1.3";
|
||||
}
|
||||
return "TLSv1.2";
|
||||
}
|
||||
|
||||
static void paramsTest() throws Exception {
|
||||
System.err.println("paramsTest");
|
||||
Http2TestServer server = new Http2TestServer(true, 0, exec, sslContext);
|
||||
server.addHandler((t -> {
|
||||
SSLSession s = t.getSSLSession();
|
||||
String prot = s.getProtocol();
|
||||
if (prot.equals("TLSv1.2")) {
|
||||
if (prot.equals(expectedTLSVersion(sslContext))) {
|
||||
t.sendResponseHeaders(200, -1);
|
||||
} else {
|
||||
System.err.printf("Protocols =%s\n", prot);
|
||||
|
|
|
@ -32,6 +32,7 @@ import java.net.http.HttpClient;
|
|||
import java.net.http.HttpRequest;
|
||||
import java.net.http.HttpRequest.BodyPublishers;
|
||||
import java.net.http.HttpResponse.BodyHandlers;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLParameters;
|
||||
import javax.net.ssl.SSLSession;
|
||||
|
||||
|
@ -57,6 +58,19 @@ public class TLSConnection {
|
|||
|
||||
private static final SSLParameters USE_DEFAULT_SSL_PARAMETERS = new SSLParameters();
|
||||
|
||||
// expect highest supported version we know about
|
||||
static String expectedTLSVersion(SSLContext ctx) throws Exception {
|
||||
if (ctx == null)
|
||||
ctx = SSLContext.getDefault();
|
||||
SSLParameters params = ctx.getSupportedSSLParameters();
|
||||
String[] protocols = params.getProtocols();
|
||||
for (String prot : protocols) {
|
||||
if (prot.equals("TLSv1.3"))
|
||||
return "TLSv1.3";
|
||||
}
|
||||
return "TLSv1.2";
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// re-enable 3DES
|
||||
Security.setProperty("jdk.tls.disabledAlgorithms", "");
|
||||
|
@ -92,7 +106,7 @@ public class TLSConnection {
|
|||
"---\nTest #2: default SSL parameters, "
|
||||
+ "expect successful connection",
|
||||
() -> connect(uriString, USE_DEFAULT_SSL_PARAMETERS));
|
||||
success &= checkProtocol(handler.getSSLSession(), "TLSv1.2");
|
||||
success &= checkProtocol(handler.getSSLSession(), expectedTLSVersion(null));
|
||||
|
||||
// set SSL_DHE_RSA_WITH_3DES_EDE_CBC_SHA cipher suite
|
||||
// which has less priority in default cipher suite list
|
||||
|
|
|
@ -350,7 +350,7 @@ public class SSLEchoTubeTest extends AbstractSSLTubeTest {
|
|||
|
||||
@Override
|
||||
public void cancel() {
|
||||
cancelled.set(true);
|
||||
queue.add(EOF);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue