8208391: Differentiate response and connect timeouts in HTTP Client API

Reviewed-by: michaelm
This commit is contained in:
Chris Hegarty 2018-08-09 11:23:12 +01:00
parent e850549b71
commit 166030817f
32 changed files with 1280 additions and 56 deletions

View file

@ -34,6 +34,7 @@ import java.net.ProxySelector;
import java.net.URLPermission;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@ -84,6 +85,7 @@ import jdk.internal.net.http.HttpClientBuilderImpl;
* <pre>{@code HttpClient client = HttpClient.newBuilder()
* .version(Version.HTTP_1_1)
* .followRedirects(Redirect.NORMAL)
* .connectTimeout(Duration.ofSeconds(20))
* .proxy(ProxySelector.of(new InetSocketAddress("proxy.example.com", 80)))
* .authenticator(Authenticator.getDefault())
* .build();
@ -94,7 +96,7 @@ import jdk.internal.net.http.HttpClientBuilderImpl;
* <p><b>Asynchronous Example</b>
* <pre>{@code HttpRequest request = HttpRequest.newBuilder()
* .uri(URI.create("https://foo.com/"))
* .timeout(Duration.ofMinutes(1))
* .timeout(Duration.ofMinutes(2))
* .header("Content-Type", "application/json")
* .POST(BodyPublishers.ofFile(Paths.get("file.json")))
* .build();
@ -196,6 +198,26 @@ public abstract class HttpClient {
*/
public Builder cookieHandler(CookieHandler cookieHandler);
/**
* Sets the connect timeout duration for this client.
*
* <p> In the case where a new connection needs to be established, if
* the connection cannot be established within the given {@code
* duration}, then {@link HttpClient#send(HttpRequest,BodyHandler)
* HttpClient::send} throws an {@link HttpConnectTimeoutException}, or
* {@link HttpClient#sendAsync(HttpRequest,BodyHandler)
* HttpClient::sendAsync} completes exceptionally with an
* {@code HttpConnectTimeoutException}. If a new connection does not
* need to be established, for example if a connection can be reused
* from a previous request, then this timeout duration has no effect.
*
* @param duration the duration to allow the underlying connection to be
* established
* @return this builder
* @throws IllegalArgumentException if the duration is non-positive
*/
public Builder connectTimeout(Duration duration);
/**
* Sets an {@code SSLContext}.
*
@ -344,6 +366,17 @@ public abstract class HttpClient {
*/
public abstract Optional<CookieHandler> cookieHandler();
/**
* Returns an {@code Optional} containing the <i>connect timeout duration</i>
* for this client. If the {@linkplain Builder#connectTimeout(Duration)
* connect timeout duration} was not set in the client's builder, then the
* {@code Optional} is empty.
*
* @return an {@code Optional} containing this client's connect timeout
* duration
*/
public abstract Optional<Duration> connectTimeout();
/**
* Returns the follow redirects policy for this client. The default value
* for client's built by builders that do not specify a redirect policy is

View file

@ -0,0 +1,48 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.net.http;
/**
* Thrown when a connection, over which an {@code HttpRequest} is intended to be
* sent, is not successfully established within a specified time period.
*
* @since 11
*/
public class HttpConnectTimeoutException extends HttpTimeoutException {
private static final long serialVersionUID = 321L + 11L;
/**
* Constructs an {@code HttpConnectTimeoutException} with the given detail
* message.
*
* @param message
* The detail message; can be {@code null}
*/
public HttpConnectTimeoutException(String message) {
super(message);
}
}

View file

@ -80,11 +80,9 @@ abstract class AbstractAsyncSSLConnection extends HttpConnection
engine = createEngine(context, serverName.getName(), port, sslParameters);
}
abstract HttpConnection plainConnection();
abstract SSLTube getConnectionFlow();
final CompletableFuture<String> getALPN() {
assert connected();
return getConnectionFlow().getALPN();
}

View file

@ -28,6 +28,8 @@ package jdk.internal.net.http;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SSLTube;
import jdk.internal.net.http.common.Utils;
@ -49,14 +51,9 @@ class AsyncSSLConnection extends AbstractAsyncSSLConnection {
}
@Override
PlainHttpConnection plainConnection() {
return plainConnection;
}
@Override
public CompletableFuture<Void> connectAsync() {
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
return plainConnection
.connectAsync()
.connectAsync(exchange)
.thenApply( unused -> {
// create the SSLTube wrapping the SocketTube, with the given engine
flow = new SSLTube(engine,
@ -66,6 +63,21 @@ class AsyncSSLConnection extends AbstractAsyncSSLConnection {
return null; } );
}
@Override
public CompletableFuture<Void> finishConnect() {
// The actual ALPN value, which may be the empty string, is not
// interesting at this point, only that the handshake has completed.
return getALPN()
.handle((String unused, Throwable ex) -> {
if (ex == null) {
return plainConnection.finishConnect();
} else {
plainConnection.close();
return MinimalFuture.<Void>failedFuture(ex);
} })
.thenCompose(Function.identity());
}
@Override
boolean connected() {
return plainConnection.connected();

View file

@ -29,6 +29,8 @@ import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
import java.net.http.HttpHeaders;
import java.util.function.Function;
import jdk.internal.net.http.common.MinimalFuture;
import jdk.internal.net.http.common.SSLTube;
import jdk.internal.net.http.common.Utils;
@ -53,13 +55,13 @@ class AsyncSSLTunnelConnection extends AbstractAsyncSSLConnection {
}
@Override
public CompletableFuture<Void> connectAsync() {
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
if (debug.on()) debug.log("Connecting plain tunnel connection");
// This will connect the PlainHttpConnection flow, so that
// its HttpSubscriber and HttpPublisher are subscribed to the
// SocketTube
return plainConnection
.connectAsync()
.connectAsync(exchange)
.thenApply( unused -> {
if (debug.on()) debug.log("creating SSLTube");
// create the SSLTube wrapping the SocketTube, with the given engine
@ -70,6 +72,21 @@ class AsyncSSLTunnelConnection extends AbstractAsyncSSLConnection {
return null;} );
}
@Override
public CompletableFuture<Void> finishConnect() {
// The actual ALPN value, which may be the empty string, is not
// interesting at this point, only that the handshake has completed.
return getALPN()
.handle((String unused, Throwable ex) -> {
if (ex == null) {
return plainConnection.finishConnect();
} else {
plainConnection.close();
return MinimalFuture.<Void>failedFuture(ex);
} })
.thenCompose(Function.identity());
}
@Override
boolean isTunnel() { return true; }
@ -86,11 +103,6 @@ class AsyncSSLTunnelConnection extends AbstractAsyncSSLConnection {
return "AsyncSSLTunnelConnection: " + super.toString();
}
@Override
PlainTunnelingConnection plainConnection() {
return plainConnection;
}
@Override
ConnectionPool.CacheKey cacheKey() {
return ConnectionPool.cacheKey(address, plainConnection.proxyAddr);

View file

@ -83,6 +83,10 @@ final class Exchange<T> {
final PushGroup<T> pushGroup;
final String dbgTag;
// Keeps track of the underlying connection when establishing an HTTP/2
// exchange so that it can be aborted/timed out mid setup.
final ConnectionAborter connectionAborter = new ConnectionAborter();
Exchange(HttpRequestImpl request, MultiExchange<T> multi) {
this.request = request;
this.upgrading = false;
@ -125,6 +129,27 @@ final class Exchange<T> {
return client;
}
// Keeps track of the underlying connection when establishing an HTTP/2
// exchange so that it can be aborted/timed out mid setup.
static final class ConnectionAborter {
private volatile HttpConnection connection;
void connection(HttpConnection connection) {
this.connection = connection;
}
void closeConnection() {
HttpConnection connection = this.connection;
this.connection = null;
if (connection != null) {
try {
connection.close();
} catch (Throwable t) {
// ignore
}
}
}
}
public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
// If we received a 407 while establishing the exchange
@ -179,6 +204,7 @@ final class Exchange<T> {
}
public void cancel(IOException cause) {
if (debug.on()) debug.log("cancel exchImpl: %s, with \"%s\"", exchImpl, cause);
// If the impl is non null, propagate the exception right away.
// Otherwise record it so that it can be propagated once the
// exchange impl has been established.
@ -190,6 +216,11 @@ final class Exchange<T> {
} else {
// no impl yet. record the exception
failed = cause;
// abort/close the connection if setting up the exchange. This can
// be important when setting up HTTP/2
connectionAborter.closeConnection();
// now call checkCancelled to recheck the impl.
// if the failed state is set and the impl is not null, reset
// the failed state and propagate the exception to the impl.

View file

@ -85,7 +85,7 @@ abstract class ExchangeImpl<T> {
} else {
Http2ClientImpl c2 = exchange.client().client2(); // #### improve
HttpRequestImpl request = exchange.request();
CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request);
CompletableFuture<Http2Connection> c2f = c2.getConnectionFor(request, exchange);
if (debug.on())
debug.log("get: Trying to get HTTP/2 connection");
return c2f.handle((h2c, t) -> createExchangeImpl(h2c, t, exchange, connection))

View file

@ -233,7 +233,8 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
CompletableFuture<Void> connectCF;
if (!connection.connected()) {
if (debug.on()) debug.log("initiating connect async");
connectCF = connection.connectAsync();
connectCF = connection.connectAsync(exchange)
.thenCompose(unused -> connection.finishConnect());
Throwable cancelled;
synchronized (lock) {
if ((cancelled = failed) == null) {

View file

@ -90,7 +90,8 @@ class Http2ClientImpl {
* 3. completes normally with null: no connection in cache for h2c or h2 failed previously
* 4. completes normally with connection: h2 or h2c connection in cache. Use it.
*/
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req) {
CompletableFuture<Http2Connection> getConnectionFor(HttpRequestImpl req,
Exchange<?> exchange) {
URI uri = req.uri();
InetSocketAddress proxy = req.proxy();
String key = Http2Connection.keyFor(uri, proxy);
@ -123,7 +124,7 @@ class Http2ClientImpl {
}
}
return Http2Connection
.createAsync(req, this)
.createAsync(req, this, exchange)
.whenComplete((conn, t) -> {
synchronized (Http2ClientImpl.this) {
if (conn != null) {

View file

@ -353,7 +353,8 @@ class Http2Connection {
// Requires TLS handshake. So, is really async
static CompletableFuture<Http2Connection> createAsync(HttpRequestImpl request,
Http2ClientImpl h2client) {
Http2ClientImpl h2client,
Exchange<?> exchange) {
assert request.secure();
AbstractAsyncSSLConnection connection = (AbstractAsyncSSLConnection)
HttpConnection.getConnection(request.getAddress(),
@ -361,7 +362,12 @@ class Http2Connection {
request,
HttpClient.Version.HTTP_2);
return connection.connectAsync()
// Expose the underlying connection to the exchange's aborter so it can
// be closed if a timeout occurs.
exchange.connectionAborter.connection(connection);
return connection.connectAsync(exchange)
.thenCompose(unused -> connection.finishConnect())
.thenCompose(unused -> checkSSLConfig(connection))
.thenCompose(notused-> {
CompletableFuture<Http2Connection> cf = new MinimalFuture<>();

View file

@ -28,6 +28,7 @@ package jdk.internal.net.http;
import java.net.Authenticator;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.time.Duration;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
@ -38,6 +39,7 @@ import static java.util.Objects.requireNonNull;
public class HttpClientBuilderImpl implements HttpClient.Builder {
CookieHandler cookieHandler;
Duration connectTimeout;
HttpClient.Redirect followRedirects;
ProxySelector proxy;
Authenticator authenticator;
@ -55,6 +57,14 @@ public class HttpClientBuilderImpl implements HttpClient.Builder {
return this;
}
@Override
public HttpClientBuilderImpl connectTimeout(Duration duration) {
requireNonNull(duration);
if (duration.isNegative() || Duration.ZERO.equals(duration))
throw new IllegalArgumentException("Invalid duration: " + duration);
this.connectTimeout = duration;
return this;
}
@Override
public HttpClientBuilderImpl sslContext(SSLContext sslContext) {

View file

@ -30,6 +30,7 @@ import java.lang.ref.Reference;
import java.net.Authenticator;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@ -69,6 +70,11 @@ final class HttpClientFacade extends HttpClient implements Trackable {
return impl.cookieHandler();
}
@Override
public Optional<Duration> connectTimeout() {
return impl.connectTimeout();
}
@Override
public Redirect followRedirects() {
return impl.followRedirects();

View file

@ -35,6 +35,7 @@ import java.net.Authenticator;
import java.net.ConnectException;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
@ -47,6 +48,7 @@ import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedAction;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@ -154,6 +156,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
}
private final CookieHandler cookieHandler;
private final Duration connectTimeout;
private final Redirect followRedirects;
private final Optional<ProxySelector> userProxySelector;
private final ProxySelector proxySelector;
@ -278,6 +281,7 @@ final class HttpClientImpl extends HttpClient implements Trackable {
facadeRef = new WeakReference<>(facadeFactory.createFacade(this));
client2 = new Http2ClientImpl(this);
cookieHandler = builder.cookieHandler;
connectTimeout = builder.connectTimeout;
followRedirects = builder.followRedirects == null ?
Redirect.NEVER : builder.followRedirects;
this.userProxySelector = Optional.ofNullable(builder.proxy);
@ -547,6 +551,10 @@ final class HttpClientImpl extends HttpClient implements Trackable {
throw new IllegalArgumentException(msg, throwable);
} else if (throwable instanceof SecurityException) {
throw new SecurityException(msg, throwable);
} else if (throwable instanceof HttpConnectTimeoutException) {
HttpConnectTimeoutException hcte = new HttpConnectTimeoutException(msg);
hcte.initCause(throwable);
throw hcte;
} else if (throwable instanceof HttpTimeoutException) {
throw new HttpTimeoutException(msg);
} else if (throwable instanceof ConnectException) {
@ -1123,6 +1131,11 @@ final class HttpClientImpl extends HttpClient implements Trackable {
return Optional.ofNullable(cookieHandler);
}
@Override
public Optional<Duration> connectTimeout() {
return Optional.ofNullable(connectTimeout);
}
@Override
public Optional<ProxySelector> proxy() {
return this.userProxySelector;

View file

@ -108,9 +108,20 @@ abstract class HttpConnection implements Closeable {
return client;
}
//public abstract void connect() throws IOException, InterruptedException;
/**
* Initiates the connect phase.
*
* Returns a CompletableFuture that completes when the underlying
* TCP connection has been established or an error occurs.
*/
public abstract CompletableFuture<Void> connectAsync(Exchange<?> exchange);
public abstract CompletableFuture<Void> connectAsync();
/**
* Finishes the connection phase.
*
* Returns a CompletableFuture that completes when any additional,
* type specific, setup has been done. Must be called after connectAsync. */
public abstract CompletableFuture<Void> finishConnect();
/** Tells whether, or not, this connection is connected to its destination. */
abstract boolean connected();

View file

@ -27,7 +27,7 @@ package jdk.internal.net.http;
import java.io.IOException;
import java.net.ConnectException;
import java.time.Duration;
import java.net.http.HttpConnectTimeoutException;
import java.util.Iterator;
import java.util.LinkedList;
import java.security.AccessControlContext;
@ -88,7 +88,7 @@ class MultiExchange<T> {
);
private final LinkedList<HeaderFilter> filters;
TimedEvent timedEvent;
ResponseTimerEvent responseTimerEvent;
volatile boolean cancelled;
final PushGroup<T> pushGroup;
@ -134,7 +134,7 @@ class MultiExchange<T> {
this.exchange = new Exchange<>(request, this);
}
private synchronized Exchange<T> getExchange() {
synchronized Exchange<T> getExchange() {
return exchange;
}
@ -157,8 +157,8 @@ class MultiExchange<T> {
}
private void cancelTimer() {
if (timedEvent != null) {
client.cancelTimer(timedEvent);
if (responseTimerEvent != null) {
client.cancelTimer(responseTimerEvent);
}
}
@ -220,8 +220,8 @@ class MultiExchange<T> {
cf = failedFuture(new IOException("Too many retries", retryCause));
} else {
if (currentreq.timeout().isPresent()) {
timedEvent = new TimedEvent(currentreq.timeout().get());
client.registerTimer(timedEvent);
responseTimerEvent = ResponseTimerEvent.of(this);
client.registerTimer(responseTimerEvent);
}
try {
// 1. apply request filters
@ -344,7 +344,9 @@ class MultiExchange<T> {
}
}
if (cancelled && t instanceof IOException) {
t = new HttpTimeoutException("request timed out");
if (!(t instanceof HttpTimeoutException)) {
t = toTimeoutException((IOException)t);
}
} else if (retryOnFailure(t)) {
Throwable cause = retryCause(t);
@ -378,17 +380,24 @@ class MultiExchange<T> {
return failedFuture(t);
}
class TimedEvent extends TimeoutEvent {
TimedEvent(Duration duration) {
super(duration);
}
@Override
public void handle() {
if (debug.on()) {
debug.log("Cancelling MultiExchange due to timeout for request %s",
request);
private HttpTimeoutException toTimeoutException(IOException ioe) {
HttpTimeoutException t = null;
// more specific, "request timed out", when connected
Exchange<?> exchange = getExchange();
if (exchange != null) {
ExchangeImpl<?> exchangeImpl = exchange.exchImpl;
if (exchangeImpl != null) {
if (exchangeImpl.connection().connected()) {
t = new HttpTimeoutException("request timed out");
t.initCause(ioe);
}
}
cancel(new HttpTimeoutException("request timed out"));
}
if (t == null) {
t = new HttpConnectTimeoutException("HTTP connect timed out");
t.initCause(new ConnectException("HTTP connect timed out"));
}
return t;
}
}

View file

@ -26,6 +26,7 @@
package jdk.internal.net.http;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
@ -34,6 +35,7 @@ import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Log;
@ -53,9 +55,52 @@ class PlainHttpConnection extends HttpConnection {
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
private volatile ConnectTimerEvent connectTimerEvent; // may be null
// should be volatile to provide proper synchronization(visibility) action
/**
* Returns a ConnectTimerEvent iff there is a connect timeout duration,
* otherwise null.
*/
private ConnectTimerEvent newConnectTimer(Exchange<?> exchange,
CompletableFuture<Void> cf) {
Duration duration = client().connectTimeout().orElse(null);
if (duration != null) {
ConnectTimerEvent cte = new ConnectTimerEvent(duration, exchange, cf);
return cte;
}
return null;
}
final class ConnectTimerEvent extends TimeoutEvent {
private final CompletableFuture<Void> cf;
private final Exchange<?> exchange;
ConnectTimerEvent(Duration duration,
Exchange<?> exchange,
CompletableFuture<Void> cf) {
super(duration);
this.exchange = exchange;
this.cf = cf;
}
@Override
public void handle() {
if (debug.on()) {
debug.log("HTTP connect timed out");
}
ConnectException ce = new ConnectException("HTTP connect timed out");
exchange.multi.cancel(ce);
client().theExecutor().execute(() -> cf.completeExceptionally(ce));
}
@Override
public String toString() {
return "ConnectTimerEvent, " + super.toString();
}
}
final class ConnectEvent extends AsyncEvent {
private final CompletableFuture<Void> cf;
@ -85,7 +130,6 @@ class PlainHttpConnection extends HttpConnection {
if (debug.on())
debug.log("ConnectEvent: connect finished: %s Local addr: %s",
finished, chan.getLocalAddress());
connected = true;
// complete async since the event runs on the SelectorManager thread
cf.completeAsync(() -> null, client().theExecutor());
} catch (Throwable e) {
@ -103,12 +147,20 @@ class PlainHttpConnection extends HttpConnection {
}
@Override
public CompletableFuture<Void> connectAsync() {
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
CompletableFuture<Void> cf = new MinimalFuture<>();
try {
assert !connected : "Already connected";
assert !chan.isBlocking() : "Unexpected blocking channel";
boolean finished = false;
boolean finished;
connectTimerEvent = newConnectTimer(exchange, cf);
if (connectTimerEvent != null) {
if (debug.on())
debug.log("registering connect timer: " + connectTimerEvent);
client().registerTimer(connectTimerEvent);
}
PrivilegedExceptionAction<Boolean> pa =
() -> chan.connect(Utils.resolveAddress(address));
try {
@ -118,7 +170,6 @@ class PlainHttpConnection extends HttpConnection {
}
if (finished) {
if (debug.on()) debug.log("connect finished without blocking");
connected = true;
cf.complete(null);
} else {
if (debug.on()) debug.log("registering connect event");
@ -136,6 +187,16 @@ class PlainHttpConnection extends HttpConnection {
return cf;
}
@Override
public CompletableFuture<Void> finishConnect() {
assert connected == false;
if (debug.on()) debug.log("finishConnect, setting connected=true");
connected = true;
if (connectTimerEvent != null)
client().cancelTimer(connectTimerEvent);
return MinimalFuture.completedFuture(null);
}
@Override
SocketChannel channel() {
return chan;
@ -210,6 +271,8 @@ class PlainHttpConnection extends HttpConnection {
Log.logTrace("Closing: " + toString());
if (debug.on())
debug.log("Closing channel: " + client().debugInterestOps(chan));
if (connectTimerEvent != null)
client().cancelTimer(connectTimerEvent);
chan.close();
tube.signalClosed();
} catch (IOException e) {

View file

@ -26,11 +26,13 @@
package jdk.internal.net.http;
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.http.HttpTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.net.http.HttpHeaders;
import jdk.internal.net.http.common.FlowTube;
@ -60,9 +62,10 @@ final class PlainTunnelingConnection extends HttpConnection {
}
@Override
public CompletableFuture<Void> connectAsync() {
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
if (debug.on()) debug.log("Connecting plain connection");
return delegate.connectAsync()
return delegate.connectAsync(exchange)
.thenCompose(unused -> delegate.finishConnect())
.thenCompose((Void v) -> {
if (debug.on()) debug.log("sending HTTP/1.1 CONNECT");
HttpClientImpl client = client();
@ -70,7 +73,7 @@ final class PlainTunnelingConnection extends HttpConnection {
HttpRequestImpl req = new HttpRequestImpl("CONNECT", address, proxyHeaders);
MultiExchange<Void> mulEx = new MultiExchange<>(null, req,
client, discarding(), null, null);
Exchange<Void> connectExchange = new Exchange<>(req, mulEx);
Exchange<Void> connectExchange = mulEx.getExchange();
return connectExchange
.responseAsyncImpl(delegate)
@ -96,14 +99,36 @@ final class PlainTunnelingConnection extends HttpConnection {
ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes();
int remaining = b.remaining();
assert remaining == 0: "Unexpected remaining: " + remaining;
connected = true;
cf.complete(null);
}
return cf;
});
})
.handle((result, ex) -> {
if (ex == null) {
return MinimalFuture.completedFuture(result);
} else {
if (debug.on())
debug.log("tunnel failed with \"%s\"", ex.toString());
Throwable t = ex;
if (t instanceof CompletionException)
t = t.getCause();
if (t instanceof HttpTimeoutException) {
String msg = "proxy tunneling CONNECT request timed out";
t = new HttpTimeoutException(msg);
t.initCause(ex);
}
return MinimalFuture.<Void>failedFuture(t);
}
})
.thenCompose(Function.identity());
});
}
public CompletableFuture<Void> finishConnect() {
connected = true;
return MinimalFuture.completedFuture(null);
}
@Override
boolean isTunnel() { return true; }

View file

@ -0,0 +1,78 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.internal.net.http;
import java.net.ConnectException;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Utils;
public class ResponseTimerEvent extends TimeoutEvent {
private static final Logger debug =
Utils.getDebugLogger("ResponseTimerEvent"::toString, Utils.DEBUG);
private final MultiExchange<?> multiExchange;
static ResponseTimerEvent of(MultiExchange<?> exchange) {
return new ResponseTimerEvent(exchange);
}
private ResponseTimerEvent(MultiExchange<?> multiExchange) {
super(multiExchange.exchange.request.timeout().get());
this.multiExchange = multiExchange;
}
@Override
public void handle() {
if (debug.on()) {
debug.log("Cancelling MultiExchange due to timeout for request %s",
multiExchange.exchange.request);
}
HttpTimeoutException t = null;
// more specific, "request timed out", message when connected
Exchange<?> exchange = multiExchange.getExchange();
if (exchange != null) {
ExchangeImpl<?> exchangeImpl = exchange.exchImpl;
if (exchangeImpl != null) {
if (exchangeImpl.connection().connected()) {
t = new HttpTimeoutException("request timed out");
}
}
}
if (t == null) {
t = new HttpConnectTimeoutException("HTTP connect timed out");
t.initCause(new ConnectException("HTTP connect timed out"));
}
multiExchange.cancel(t);
}
@Override
public String toString() {
return "ResponseTimerEvent[" + super.toString() + "]";
}
}

View file

@ -43,9 +43,11 @@ abstract class TimeoutEvent implements Comparable<TimeoutEvent> {
// we use id in compareTo to make compareTo consistent with equals
// see TimeoutEvent::compareTo below;
private final long id = COUNTER.incrementAndGet();
private final Duration duration;
private final Instant deadline;
TimeoutEvent(Duration duration) {
this.duration = duration;
deadline = Instant.now().plus(duration);
}
@ -75,6 +77,7 @@ abstract class TimeoutEvent implements Comparable<TimeoutEvent> {
@Override
public String toString() {
return "TimeoutEvent[id=" + id + ", deadline=" + deadline + "]";
return "TimeoutEvent[id=" + id + ", duration=" + duration
+ ", deadline=" + deadline + "]";
}
}