8212926: HttpClient does not retrieve files with large sizes over HTTP/1.1

Reviewed-by: chegar, dfuchs
This commit is contained in:
Michael McMahon 2018-10-25 12:09:41 +01:00
parent 0d815c2677
commit ae21c81dd7
3 changed files with 192 additions and 16 deletions

View file

@ -236,20 +236,20 @@ class Http1Response<T> {
* Return known fixed content length or -1 if chunked, or -2 if no content-length * Return known fixed content length or -1 if chunked, or -2 if no content-length
* information in which case, connection termination delimits the response body * information in which case, connection termination delimits the response body
*/ */
int fixupContentLen(int clen) { long fixupContentLen(long clen) {
if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) { if (request.method().equalsIgnoreCase("HEAD") || responseCode == HTTP_NOT_MODIFIED) {
return 0; return 0L;
} }
if (clen == -1) { if (clen == -1L) {
if (headers.firstValue("Transfer-encoding").orElse("") if (headers.firstValue("Transfer-encoding").orElse("")
.equalsIgnoreCase("chunked")) { .equalsIgnoreCase("chunked")) {
return -1; return -1L;
} }
if (responseCode == 101) { if (responseCode == 101) {
// this is a h2c or websocket upgrade, contentlength must be zero // this is a h2c or websocket upgrade, contentlength must be zero
return 0; return 0L;
} }
return -2; return -2L;
} }
return clen; return clen;
} }
@ -383,9 +383,8 @@ class Http1Response<T> {
final CompletableFuture<U> cf = new MinimalFuture<>(); final CompletableFuture<U> cf = new MinimalFuture<>();
int clen0 = (int)headers.firstValueAsLong("Content-Length").orElse(-1); long clen0 = headers.firstValueAsLong("Content-Length").orElse(-1L);
final long clen = fixupContentLen(clen0);
final int clen = fixupContentLen(clen0);
// expect-continue reads headers and body twice. // expect-continue reads headers and body twice.
// if we reach here, we must reset the headersReader state. // if we reach here, we must reset the headersReader state.

View file

@ -46,7 +46,7 @@ import static java.lang.String.format;
class ResponseContent { class ResponseContent {
final HttpResponse.BodySubscriber<?> pusher; final HttpResponse.BodySubscriber<?> pusher;
final int contentLength; final long contentLength;
final HttpHeaders headers; final HttpHeaders headers;
// this needs to run before we complete the body // this needs to run before we complete the body
// so that connection can be returned to pool // so that connection can be returned to pool
@ -54,7 +54,7 @@ class ResponseContent {
private final String dbgTag; private final String dbgTag;
ResponseContent(HttpConnection connection, ResponseContent(HttpConnection connection,
int contentLength, long contentLength,
HttpHeaders h, HttpHeaders h,
HttpResponse.BodySubscriber<?> userSubscriber, HttpResponse.BodySubscriber<?> userSubscriber,
Runnable onFinished) Runnable onFinished)
@ -474,14 +474,14 @@ class ResponseContent {
} }
class FixedLengthBodyParser implements BodyParser { class FixedLengthBodyParser implements BodyParser {
final int contentLength; final long contentLength;
final Consumer<Throwable> onComplete; final Consumer<Throwable> onComplete;
final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG); final Logger debug = Utils.getDebugLogger(this::dbgString, Utils.DEBUG);
final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser"; final String dbgTag = ResponseContent.this.dbgTag + "/FixedLengthBodyParser";
volatile int remaining; volatile long remaining;
volatile Throwable closedExceptionally; volatile Throwable closedExceptionally;
volatile AbstractSubscription sub; volatile AbstractSubscription sub;
FixedLengthBodyParser(int contentLength, Consumer<Throwable> onComplete) { FixedLengthBodyParser(long contentLength, Consumer<Throwable> onComplete) {
this.contentLength = this.remaining = contentLength; this.contentLength = this.remaining = contentLength;
this.onComplete = onComplete; this.onComplete = onComplete;
} }
@ -527,7 +527,7 @@ class ResponseContent {
} }
boolean completed = false; boolean completed = false;
try { try {
int unfulfilled = remaining; long unfulfilled = remaining;
if (debug.on()) if (debug.on())
debug.log("Parser got %d bytes (%d remaining / %d)", debug.log("Parser got %d bytes (%d remaining / %d)",
b.remaining(), unfulfilled, contentLength); b.remaining(), unfulfilled, contentLength);
@ -541,7 +541,7 @@ class ResponseContent {
// demand. // demand.
boolean hasDemand = sub.demand().tryDecrement(); boolean hasDemand = sub.demand().tryDecrement();
assert hasDemand; assert hasDemand;
int amount = Math.min(b.remaining(), unfulfilled); int amount = (int)Math.min(b.remaining(), unfulfilled); // safe cast
unfulfilled = remaining -= amount; unfulfilled = remaining -= amount;
ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount); ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount);
pusher.onNext(List.of(buffer.asReadOnlyBuffer())); pusher.onNext(List.of(buffer.asReadOnlyBuffer()));

View file

@ -0,0 +1,177 @@
/*
* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpHeaders;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
/**
* @test
* @bug 8212926
* @summary Basic tests for response timeouts
* @run main/othervm LargeResponseContent
*/
public class LargeResponseContent {
final ServerSocket server;
final int port;
public LargeResponseContent() throws Exception {
server = new ServerSocket(0, 10, InetAddress.getLoopbackAddress());
Thread serverThread = new Thread(this::handleConnection);
serverThread.setDaemon(false);
port = server.getLocalPort();
serverThread.start();
}
void runClient() throws IOException, InterruptedException {
URI uri = URI.create("http://127.0.0.1:" + Integer.toString(port) + "/foo");
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder(uri)
.GET()
.build();
HttpResponse<Long> response = client.send(request, new ClientHandler());
System.out.println("Response code = " + response.statusCode());
long blen = response.body();
if (blen != CONTENT_LEN)
throw new RuntimeException("wrong content length");
}
public static void main(String[] args) throws Exception {
System.out.println ("CONTENT_LEN = " + CONTENT_LEN);
System.out.println ("CLEN_STR = " + CLEN_STR);
LargeResponseContent test = new LargeResponseContent();
test.runClient();
}
static class ClientHandler implements HttpResponse.BodyHandler<Long> {
@Override
public HttpResponse.BodySubscriber<Long> apply(HttpResponse.ResponseInfo responseInfo) {
HttpHeaders headers = responseInfo.headers();
headers.firstValue("content-length");
long clen = headers.firstValueAsLong("content-length").orElse(-1);
if (clen != CONTENT_LEN)
return new Subscriber(new RuntimeException("Wrong content length received"));
return new Subscriber(null);
}
}
static class Subscriber implements HttpResponse.BodySubscriber<Long> {
final CompletableFuture<Long> cf = new CompletableFuture<>();
volatile Flow.Subscription subscription;
volatile long counter = 0;
Subscriber(Throwable t) {
if (t != null)
cf.completeExceptionally(t);
}
@Override
public CompletionStage<Long> getBody() {
return cf;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(List<ByteBuffer> item) {
long v = 0;
for (ByteBuffer b : item)
v+= b.remaining();
counter += v;
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
cf.complete(counter);
}
}
static final long CONTENT_LEN = Integer.MAX_VALUE + 1000L;
static final String CLEN_STR = Long.valueOf(CONTENT_LEN).toString();
static String RESPONSE = "HTTP/1.1 200 OK\r\n" +
"Content-length: " + CLEN_STR + "\r\n" +
"\r\n";
void readHeaders(InputStream is) throws IOException {
String s = "";
byte[] buf = new byte[128];
while (!s.endsWith("\r\n\r\n")) {
int c = is.read(buf);
String f = new String(buf, 0, c, StandardCharsets.ISO_8859_1);
s = s + f;
}
}
public void handleConnection() {
long remaining = CONTENT_LEN;
try {
Socket socket = server.accept();
InputStream is = socket.getInputStream();
readHeaders(is); // read first byte
OutputStream os = socket.getOutputStream();
os.write(RESPONSE.getBytes());
byte[] buf = new byte[64 * 1024];
while (remaining > 0) {
int amount = (int)Math.min(remaining, buf.length);
os.write(buf, 0, amount);
remaining -= amount;
}
System.out.println("Server: finished writing");
os.close();
} catch (IOException e) {
long sent = CONTENT_LEN - remaining;
System.out.println("Sent " + sent);
e.printStackTrace();
}
}
}