8217094: HttpClient SSL race if a socket IOException is raised before ALPN is available

The patch makes suer that the SSLFlowDelegate's ALPN CF is always completed

Reviewed-by: chegar
This commit is contained in:
Daniel Fuchs 2019-01-16 19:09:16 +00:00
parent f31819f7c5
commit 3b68bb2960
6 changed files with 360 additions and 20 deletions

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -32,6 +32,7 @@ import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.HandshakeStatus; import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException; import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.IOException; import java.io.IOException;
import java.lang.ref.Reference; import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue; import java.lang.ref.ReferenceQueue;
@ -109,6 +110,7 @@ public class SSLFlowDelegate {
volatile boolean close_notify_received; volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF; final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF; final CompletableFuture<Void> writerCF;
final CompletableFuture<Void> stopCF;
final Consumer<ByteBuffer> recycler; final Consumer<ByteBuffer> recycler;
static AtomicInteger scount = new AtomicInteger(1); static AtomicInteger scount = new AtomicInteger(1);
final int id; final int id;
@ -149,8 +151,7 @@ public class SSLFlowDelegate {
this.writerCF = reader.completion(); this.writerCF = reader.completion();
readerCF.exceptionally(this::stopOnError); readerCF.exceptionally(this::stopOnError);
writerCF.exceptionally(this::stopOnError); writerCF.exceptionally(this::stopOnError);
this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion())
CompletableFuture.allOf(reader.completion(), writer.completion())
.thenRun(this::normalStop); .thenRun(this::normalStop);
this.alpnCF = new MinimalFuture<>(); this.alpnCF = new MinimalFuture<>();
@ -302,7 +303,9 @@ public class SSLFlowDelegate {
return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() return "READER: " + super.toString() + ", readBuf: " + readBuf.toString()
+ ", count: " + count.toString() + ", scheduler: " + ", count: " + count.toString() + ", scheduler: "
+ (scheduler.isStopped() ? "stopped" : "running") + (scheduler.isStopped() ? "stopped" : "running")
+ ", status: " + lastUnwrapStatus; + ", status: " + lastUnwrapStatus
+ ", handshakeState: " + handshakeState.get()
+ ", engine: " + engine.getHandshakeStatus();
} }
private void reallocReadBuf() { private void reallocReadBuf() {
@ -429,6 +432,8 @@ public class SSLFlowDelegate {
if (complete && result.status() == Status.CLOSED) { if (complete && result.status() == Status.CLOSED) {
if (debugr.on()) debugr.log("Closed: completing"); if (debugr.on()) debugr.log("Closed: completing");
outgoing(Utils.EMPTY_BB_LIST, true); outgoing(Utils.EMPTY_BB_LIST, true);
// complete ALPN if not yet completed
setALPN();
return; return;
} }
if (result.handshaking()) { if (result.handshaking()) {
@ -437,11 +442,7 @@ public class SSLFlowDelegate {
if (doHandshake(result, READER)) continue; // need unwrap if (doHandshake(result, READER)) continue; // need unwrap
else break; // doHandshake will have triggered the write scheduler if necessary else break; // doHandshake will have triggered the write scheduler if necessary
} else { } else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { if (trySetALPN()) {
handshaking = false;
applicationBufferSize = engine.getSession().getApplicationBufferSize();
packetBufferSize = engine.getSession().getPacketBufferSize();
setALPN();
resumeActivity(); resumeActivity();
} }
} }
@ -741,6 +742,8 @@ public class SSLFlowDelegate {
if (!upstreamCompleted) { if (!upstreamCompleted) {
upstreamCompleted = true; upstreamCompleted = true;
upstreamSubscription.cancel(); upstreamSubscription.cancel();
// complete ALPN if not yet completed
setALPN();
} }
if (result.bytesProduced() <= 0) if (result.bytesProduced() <= 0)
return; return;
@ -758,10 +761,7 @@ public class SSLFlowDelegate {
doHandshake(result, WRITER); // ok to ignore return doHandshake(result, WRITER); // ok to ignore return
handshaking = true; handshaking = true;
} else { } else {
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { if (trySetALPN()) {
applicationBufferSize = engine.getSession().getApplicationBufferSize();
packetBufferSize = engine.getSession().getPacketBufferSize();
setALPN();
resumeActivity(); resumeActivity();
} }
} }
@ -914,11 +914,25 @@ public class SSLFlowDelegate {
stopped = true; stopped = true;
reader.stop(); reader.stop();
writer.stop(); writer.stop();
// make sure the alpnCF is completed.
if (!alpnCF.isDone()) {
Throwable alpn = new SSLHandshakeException(
"Connection closed before successful ALPN negotiation");
alpnCF.completeExceptionally(alpn);
}
if (isMonitored) Monitor.remove(monitor); if (isMonitored) Monitor.remove(monitor);
} }
private Void stopOnError(Throwable currentlyUnused) { private Void stopOnError(Throwable error) {
// maybe log, etc // maybe log, etc
// ensure the ALPN is completed
// We could also do this in SSLTube.SSLSubscriberWrapper
// onError/onComplete - with the caveat that the ALP CF
// would get completed externally. Doing it here keeps
// it all inside SSLFlowDelegate.
if (!alpnCF.isDone()) {
alpnCF.completeExceptionally(error);
}
normalStop(); normalStop();
return null; return null;
} }
@ -1070,6 +1084,11 @@ public class SSLFlowDelegate {
} }
} while (true); } while (true);
if (debug.on()) debug.log("finished task execution"); if (debug.on()) debug.log("finished task execution");
HandshakeStatus hs = engine.getHandshakeStatus();
if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) {
// We're no longer handshaking, try setting ALPN
trySetALPN();
}
resumeActivity(); resumeActivity();
} catch (Throwable t) { } catch (Throwable t) {
handleError(t); handleError(t);
@ -1077,6 +1096,17 @@ public class SSLFlowDelegate {
}); });
} }
boolean trySetALPN() {
// complete ALPN CF if needed.
if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) {
applicationBufferSize = engine.getSession().getApplicationBufferSize();
packetBufferSize = engine.getSession().getPacketBufferSize();
setALPN();
return true;
}
return false;
}
// FIXME: acknowledge a received CLOSE request from peer // FIXME: acknowledge a received CLOSE request from peer
EngineResult doClosure(EngineResult r) throws IOException { EngineResult doClosure(EngineResult r) throws IOException {
if (debug.on()) if (debug.on())

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -274,7 +274,11 @@ public class SSLTube implements FlowTube {
@Override @Override
public String toString() { public String toString() {
return "DelegateWrapper:" + delegate.toString(); return "DelegateWrapper[subscribedCalled: " + subscribedCalled
+", subscribedDone: " + subscribedDone
+", completed: " + completed
+", error: " + error
+"]: " + delegate;
} }
} }
@ -288,6 +292,20 @@ public class SSLTube implements FlowTube {
private final AtomicReference<Throwable> errorRef private final AtomicReference<Throwable> errorRef
= new AtomicReference<>(); = new AtomicReference<>();
@Override
public String toString() {
DelegateWrapper sub = subscribed;
DelegateWrapper pend = pendingDelegate.get();
// Though final sslFD may be null if called from within
// SSLFD::connect() as SSLTube is not fully constructed yet.
SSLFlowDelegate sslFD = sslDelegate;
return "SSLSubscriberWrapper[" + SSLTube.this
+ ", delegate: " + (sub == null ? pend :sub)
+ ", getALPN: " + (sslFD == null ? null : sslFD.alpn())
+ ", onCompleteReceived: " + onCompleteReceived
+ ", onError: " + errorRef.get() + "]";
}
// setDelegate can be called asynchronously when the SSLTube flow // setDelegate can be called asynchronously when the SSLTube flow
// is connected. At this time the permanent subscriber (this class) // is connected. At this time the permanent subscriber (this class)
// may already be subscribed (readSubscription != null) or not. // may already be subscribed (readSubscription != null) or not.
@ -319,6 +337,9 @@ public class SSLTube implements FlowTube {
debug.log("SSLSubscriberWrapper (reader) no subscription yet"); debug.log("SSLSubscriberWrapper (reader) no subscription yet");
return; return;
} }
// sslDelegate field should have been initialized by the
// the time we reach here, as there can be no subscriber
// until SSLTube is fully constructed.
if (handleNow || !sslDelegate.resumeReader()) { if (handleNow || !sslDelegate.resumeReader()) {
processPendingSubscriber(); processPendingSubscriber();
} }
@ -429,7 +450,8 @@ public class SSLTube implements FlowTube {
Throwable failed; Throwable failed;
boolean completed; boolean completed;
// reset any demand that may have been made by the previous // reset any demand that may have been made by the previous
// subscriber // subscriber. sslDelegate field should have been initialized,
// since we only reach here when there is a subscriber.
sslDelegate.resetReaderDemand(); sslDelegate.resetReaderDemand();
// send the subscription to the subscriber. // send the subscription to the subscriber.
subscriberImpl.onSubscribe(subscription); subscriberImpl.onSubscribe(subscription);

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -468,7 +468,8 @@ public abstract class SubscriberWrapper
.append(" outputQ size: ").append(Integer.toString(outputQ.size())) .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
//.append(" outputQ: ").append(outputQ.toString()) //.append(" outputQ: ").append(outputQ.toString())
.append(" cf: ").append(cf.toString()) .append(" cf: ").append(cf.toString())
.append(" downstreamSubscription: ").append(downstreamSubscription.toString()); .append(" downstreamSubscription: ").append(downstreamSubscription)
.append(" downstreamSubscriber: ").append(downstreamSubscriber);
return sb.toString(); return sb.toString();
} }

View file

@ -0,0 +1,197 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @summary This test will timeout if the ALPN CF is not completed
* when a 'Connection reset by peer' exception is raised
* during the handshake.
* @bug 8217094
* @modules java.net.http
* java.logging
* @build ALPNFailureTest
* @run main/othervm -Djdk.internal.httpclient.debug=true ALPNFailureTest HTTP_1_1
* @run main/othervm ALPNFailureTest HTTP_2
*/
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ProxySelector;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.StandardSocketOptions;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpTimeoutException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class ALPNFailureTest {
public static void main(String[] args) throws Exception{
if (args == null || args.length == 0) {
args = new String[] {HttpClient.Version.HTTP_1_1.name()};
}
ServerSocket socket = ServerSocketFactory.getDefault()
.createServerSocket(0, 10, InetAddress.getLoopbackAddress());
test(socket, null, null, args);
}
public static void test(ServerSocket socket, SSLContext context,
ProxySelector ps, String... args)
throws Exception
{
System.out.println("Tests a race condition in SSLTube/SSLFlowDelegate");
System.out.println("This test will timeout if the ALPN CF is not completed" +
" when a 'Connection reset by peer' exception is raised" +
" during the handshake - see 8217094.");
URI uri = new URI("https", null,
socket.getInetAddress().getHostAddress(), socket.getLocalPort(),
"/ReadOnlyServer/https_1_1/", null, null);
HttpRequest request1 = HttpRequest.newBuilder(uri)
.GET().build();
HttpRequest request2 = HttpRequest.newBuilder(uri)
.POST(HttpRequest.BodyPublishers.ofString("foo")).build();
ReadOnlyServer server = new ReadOnlyServer(socket);
Thread serverThread = new Thread(server, "ServerThread");
serverThread.start();
try {
for (var arg : args) {
var version = HttpClient.Version.valueOf(arg);
HttpClient.Builder builder = HttpClient.newBuilder()
.version(version);
if (ps != null) builder.proxy(ps);
if (context != null) builder.sslContext(context);
HttpClient client = builder.build();
for (var request : List.of(request1, request2)) {
System.out.println("Server is " + socket.getLocalSocketAddress()
+ ", Version is " + version + ", Method is " + request.method()
+ (ps == null ? ", no proxy"
: (", Proxy is " + ps.select(request.uri()))));
try {
HttpResponse<String> resp =
client.send(request, HttpResponse.BodyHandlers.ofString());
throw new AssertionError(
"Client should not have received any response: " + resp);
} catch (HttpTimeoutException x) {
System.out.println("Unexpected " + x);
x.printStackTrace();
throw new AssertionError("Unexpected exception " + x, x);
} catch (Exception x) {
// We expect IOException("Connection reset by peer"), but
// any exception would do: we just don't want to linger
// forever.
System.err.println("Client got expected exception: " + x);
x.printStackTrace(System.out);
}
}
}
} finally {
server.close();
}
}
public static class ReadOnlyServer implements Runnable, Closeable {
final ServerSocket socket;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final AtomicBoolean closing = new AtomicBoolean();
ReadOnlyServer(ServerSocket socket) {
this.socket = socket;
}
@Override
public void run() {
int count = 0;
int all = 0;
try {
System.out.println("Server starting");
while (!closing.get()) {
all += count;
count = 0;
try (Socket client = socket.accept()) {
client.setSoTimeout(1000);
client.setOption(StandardSocketOptions.SO_LINGER, 0);
InputStream is = client.getInputStream();
OutputStream os = client.getOutputStream();
boolean drain = true;
int timeouts = 0;
// now read some byte from the ClientHello
// and abruptly close the socket.
while (drain) {
try {
is.read();
count++;
if (count >= 50) {
drain = false;
}
} catch (SocketTimeoutException so) {
// make sure we read something
if (count > 0) timeouts++;
if (timeouts == 5) {
// presumably the client is
// waiting for us to answer...
// but we should not reach here.
drain = false;
}
}
}
System.out.println("Got " + count + " bytes");
}
}
} catch (Throwable t) {
if (!closing.get()) {
errorRef.set(t);
t.printStackTrace();
}
} finally {
System.out.println("Server existing after reading " + (all + count) + " bytes");
close();
}
}
@Override
public void close() {
if (closing.getAndSet(true))
return; // already closed
try {
socket.close();
} catch (IOException x) {
System.out.println("Exception while closing: " + x);
}
}
}
}

View file

@ -0,0 +1,82 @@
/*
* Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* @test
* @summary This test will timeout if the ALPN CF is not completed
* when a 'Connection reset by peer' exception is raised
* during the handshake.
* @bug 8217094
* @library /test/lib http2/server
* @build jdk.test.lib.net.SimpleSSLContext HttpServerAdapters DigestEchoServer
* ALPNFailureTest ALPNProxyFailureTest
* @modules java.net.http/jdk.internal.net.http.common
* java.net.http/jdk.internal.net.http.frame
* java.net.http/jdk.internal.net.http.hpack
* java.logging
* java.base/sun.net.www.http
* java.base/sun.net.www
* java.base/sun.net
* @build ALPNFailureTest
* @run main/othervm -Djdk.internal.httpclient.debug=true -Dtest.nolinger=true ALPNProxyFailureTest HTTP_1_1
* @run main/othervm -Dtest.nolinger=true ALPNProxyFailureTest HTTP_2
*/
import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
import jdk.test.lib.net.SimpleSSLContext;
import java.net.InetAddress;
import java.net.ProxySelector;
import java.net.ServerSocket;
import java.net.http.HttpClient;
public class ALPNProxyFailureTest extends ALPNFailureTest {
static final SSLContext context;
static {
try {
context = new SimpleSSLContext().get();
SSLContext.setDefault(context);
} catch (Exception x) {
throw new ExceptionInInitializerError(x);
}
}
public static void main(String[] args) throws Exception{
if (args == null || args.length == 0) {
args = new String[] {HttpClient.Version.HTTP_1_1.name()};
}
ServerSocket socket = ServerSocketFactory.getDefault()
.createServerSocket(0, 10, InetAddress.getLoopbackAddress());
DigestEchoServer.TunnelingProxy proxy = DigestEchoServer.createHttpsProxyTunnel(
DigestEchoServer.HttpAuthSchemeType.NONE);
ProxySelector ps = ProxySelector.of(proxy.getProxyAddress());
try {
test(socket, context, ps, args);
} finally {
proxy.stop();
}
}
}

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -41,6 +41,7 @@ import java.net.MalformedURLException;
import java.net.PasswordAuthentication; import java.net.PasswordAuthentication;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.StandardSocketOptions;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
@ -77,6 +78,8 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
public static final boolean DEBUG = public static final boolean DEBUG =
Boolean.parseBoolean(System.getProperty("test.debug", "false")); Boolean.parseBoolean(System.getProperty("test.debug", "false"));
public static final boolean NO_LINGER =
Boolean.parseBoolean(System.getProperty("test.nolinger", "false"));
public enum HttpAuthType { public enum HttpAuthType {
SERVER, PROXY, SERVER307, PROXY305 SERVER, PROXY, SERVER307, PROXY305
/* add PROXY_AND_SERVER and SERVER_PROXY_NONE */ /* add PROXY_AND_SERVER and SERVER_PROXY_NONE */
@ -1603,6 +1606,11 @@ public abstract class DigestEchoServer implements HttpServerAdapters {
Socket toClose; Socket toClose;
try { try {
toClose = clientConnection = ss.accept(); toClose = clientConnection = ss.accept();
if (NO_LINGER) {
// can be useful to trigger "Connection reset by peer"
// errors on the client side.
clientConnection.setOption(StandardSocketOptions.SO_LINGER, 0);
}
} catch (IOException io) { } catch (IOException io) {
if (DEBUG || !stopped) io.printStackTrace(System.out); if (DEBUG || !stopped) io.printStackTrace(System.out);
break; break;