Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.net.ProtocolException;
import java.net.http.HttpClient.Version;
import java.net.http.HttpHeaders;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -64,6 +65,8 @@ final class Exchange<T> {
volatile CompletableFuture<? extends ExchangeImpl<T>> exchangeCF;
volatile CompletableFuture<Void> bodyIgnored;
volatile boolean streamLimitReached;
volatile Response cachedProxyResponse; // cached response from proxy 407
volatile byte[] cachedProxyBody; // cached body from proxy 407 response

// used to record possible cancellation raised before the exchImpl
// has been established.
Expand Down Expand Up @@ -228,6 +231,30 @@ public CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler) {
// an HTTP/2 tunnel through an HTTP/1.1 proxy)
if (bodyIgnored != null) return MinimalFuture.completedFuture(null);

// If we have a cached proxy body (from 407 response), use it
if (cachedProxyBody != null) {
try {
// Use cached response and body
Response proxyResponse = cachedProxyResponse;
byte[] bodyBytes = cachedProxyBody;
cachedProxyResponse = null;
cachedProxyBody = null;

HttpResponse.BodySubscriber<T> subscriber = handler.apply(
new ResponseInfoImpl(proxyResponse));
subscriber.onSubscribe(new java.util.concurrent.Flow.Subscription() {
public void request(long n) {
subscriber.onNext(java.util.List.of(java.nio.ByteBuffer.wrap(bodyBytes)));
subscriber.onComplete();
}
public void cancel() {}
});
return subscriber.getBody().toCompletableFuture();
} catch (Exception e) {
return MinimalFuture.failedFuture(e);
}
}

// The connection will not be returned to the pool in the case of WebSocket
return exchImpl.readBodyAsync(handler, !request.isWebSocket(), parentExecutor)
.whenComplete((r,t) -> exchImpl.completed());
Expand Down Expand Up @@ -455,14 +482,20 @@ public CompletableFuture<Response> responseAsync() {
private CompletableFuture<Response> checkFor407(ExchangeImpl<T> ex, Throwable t,
Function<ExchangeImpl<T>,CompletableFuture<Response>> andThen) {
t = Utils.getCompletionCause(t);
if (t instanceof ProxyAuthenticationRequired) {
if (t instanceof ProxyAuthenticationRequired par) {
if (debug.on()) debug.log("checkFor407: ProxyAuthenticationRequired: building synthetic response");
bodyIgnored = MinimalFuture.completedFuture(null);
Response proxyResponse = ((ProxyAuthenticationRequired)t).proxyResponse;
// Cache the proxy response and body if available
cachedProxyResponse = par.proxyResponse;
cachedProxyBody = par.proxyResponseBody;
// Don't set bodyIgnored if we have a cached body
if (cachedProxyBody == null) {
bodyIgnored = MinimalFuture.completedFuture(null);
}
HttpConnection c = ex == null ? null : ex.connection();
Response syntheticResponse = new Response(request, this,
proxyResponse.headers, c, proxyResponse.statusCode,
proxyResponse.version, true);
cachedProxyResponse.headers, c, cachedProxyResponse.statusCode,
cachedProxyResponse.version, cachedProxyBody == null); // body ignored only if not cached

return MinimalFuture.completedFuture(syntheticResponse);
} else if (t != null) {
if (debug.on()) debug.log("checkFor407: no response - %s", (Object)t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@

import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.MinimalFuture;

import static java.net.http.HttpResponse.BodyHandlers.discarding;
import static java.net.http.HttpResponse.BodyHandlers.ofByteArray;
import static jdk.internal.net.http.common.Utils.ProxyHeaders;

/**
* A plain text socket tunnel through a proxy. Uses "CONNECT" but does not
* encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy.
* Wrapped in SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
* encrypt. Used by WebSocket, as well as HTTP over SSL + Proxy. Wrapped in
* SSLTunnelConnection or AsyncSSLTunnelConnection for encryption.
*/
final class PlainTunnelingConnection extends HttpConnection {

Expand All @@ -65,65 +67,84 @@ protected PlainTunnelingConnection(Origin originServer,

@Override
public CompletableFuture<Void> connectAsync(Exchange<?> exchange) {
if (debug.on()) debug.log("Connecting plain connection");
if (debug.on()) {
debug.log("Connecting plain connection");
}
return delegate.connectAsync(exchange)
.thenCompose(unused -> delegate.finishConnect())
.thenCompose((Void v) -> {
if (debug.on()) debug.log("sending HTTP/1.1 CONNECT");
HttpClientImpl client = client();
assert client != null;
HttpRequestImpl req = new HttpRequestImpl("CONNECT", address, proxyHeaders);
MultiExchange<Void> mulEx = new MultiExchange<>(null, req,
client, discarding(), null);
Exchange<Void> connectExchange = mulEx.getExchange();

return connectExchange
.responseAsyncImpl(delegate)
.thenCompose((Response resp) -> {
CompletableFuture<Void> cf = new MinimalFuture<>();
if (debug.on()) debug.log("got response: %d", resp.statusCode());
if (resp.statusCode() == 407) {
return connectExchange.ignoreBody().handle((r,t) -> {
// close delegate after reading body: we won't
// be reusing that connection anyway.
.thenCompose(unused -> delegate.finishConnect())
.thenCompose((Void v) -> {
if (debug.on()) {
debug.log("sending HTTP/1.1 CONNECT");
}
HttpClientImpl client = client();
assert client != null;
HttpRequestImpl req = new HttpRequestImpl(
"CONNECT", address, proxyHeaders);
MultiExchange<byte[]> mulEx = new MultiExchange<>(
null, req, client, ofByteArray(), null);
Exchange<byte[]> connectExchange = mulEx.getExchange();

return connectExchange
.responseAsyncImpl(delegate)
.thenCompose((Response resp) -> {
CompletableFuture<Void> cf =
new MinimalFuture<>();
if (debug.on()) {
debug.log("got response: %d",
resp.statusCode());
}
if (resp.statusCode() == 407) {
// Read the 407 body
return connectExchange.readBodyAsync(ofByteArray())
.handle((bodyBytes, t) -> {
// close delegate after reading body: we won't
// be reusing that connection anyway.
delegate.close();
ProxyAuthenticationRequired authenticationRequired =
new ProxyAuthenticationRequired(resp, bodyBytes);
cf.completeExceptionally(authenticationRequired);
return cf;
}).thenCompose(Function.identity());
} else if (resp.statusCode() != 200) {
delegate.close();
ProxyAuthenticationRequired authenticationRequired =
new ProxyAuthenticationRequired(resp);
cf.completeExceptionally(authenticationRequired);
return cf;
}).thenCompose(Function.identity());
} else if (resp.statusCode() != 200) {
delegate.close();
cf.completeExceptionally(new IOException(
"Tunnel failed, got: "+ resp.statusCode()));
} else {
// get the initial/remaining bytes
ByteBuffer b = ((Http1Exchange<?>)connectExchange.exchImpl).drainLeftOverBytes();
int remaining = b.remaining();
assert remaining == 0: "Unexpected remaining: " + remaining;
cf.complete(null);
}
return cf;
})
.handle((result, ex) -> {
if (ex == null) {
return MinimalFuture.completedFuture(result);
} else {
if (debug.on())
debug.log("tunnel failed with \"%s\"", ex.toString());
Throwable t = ex;
if (t instanceof CompletionException)
t = t.getCause();
if (t instanceof HttpTimeoutException) {
String msg = "proxy tunneling CONNECT request timed out";
t = new HttpTimeoutException(msg);
t.initCause(ex);
cf.completeExceptionally(new IOException(
"Tunnel failed, got: "
+ resp.statusCode()));
} else {
// get the initial/remaining bytes
ByteBuffer b = ((Http1Exchange<?>) connectExchange.exchImpl)
.drainLeftOverBytes();
int remaining = b.remaining();
assert remaining == 0
: "Unexpected remaining: " + remaining;
cf.complete(null);
}
return cf;
})
.handle((result, ex) -> {
if (ex == null) {
return MinimalFuture.completedFuture(
result);
} else {
if (debug.on()) {
debug.log("tunnel failed with \"%s\"",
ex.toString());
}
Throwable t = ex;
if (t instanceof CompletionException) {
t = t.getCause();
}
if (t instanceof HttpTimeoutException) {
String msg =
"proxy tunneling CONNECT request timed out";
t = new HttpTimeoutException(msg);
t.initCause(ex);
}
return MinimalFuture.<Void>failedFuture(t);
}
return MinimalFuture.<Void>failedFuture(t);
}
})
.thenCompose(Function.identity());
});
})
.thenCompose(Function.identity());
});
}

public CompletableFuture<Void> finishConnect() {
Expand All @@ -132,10 +153,14 @@ public CompletableFuture<Void> finishConnect() {
}

@Override
boolean isTunnel() { return true; }
boolean isTunnel() {
return true;
}

@Override
HttpPublisher publisher() { return delegate.publisher(); }
HttpPublisher publisher() {
return delegate.publisher();
}

@Override
boolean connected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@
final class ProxyAuthenticationRequired extends IOException {
private static final long serialVersionUID = 0;
final transient Response proxyResponse;
final transient byte[] proxyResponseBody;

/**
* Constructs a {@code ProxyAuthenticationRequired} with the specified detail
* message and cause.
*
* @param proxyResponse the response from the proxy
*/
public ProxyAuthenticationRequired(Response proxyResponse) {
this(proxyResponse, null);
}

public ProxyAuthenticationRequired(Response proxyResponse, byte[] proxyResponseBody) {
super("Proxy Authentication Required");
assert proxyResponse.statusCode() == 407;
this.proxyResponse = proxyResponse;
this.proxyResponseBody = proxyResponseBody;
}
}
Loading