From abb9cb8a51570b724c83daaf874272a61c172690 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 8 Jul 2025 18:09:10 +0530 Subject: [PATCH 1/8] 8326498: java.net.http.HttpClient connection leak using http/2 --- .../jdk/internal/net/http/Exchange.java | 11 +- .../internal/net/http/Http2ClientImpl.java | 9 +- .../internal/net/http/Http2Connection.java | 494 +++++++++--------- .../net/http/Http2TerminationCause.java | 277 ++++++++++ .../jdk/internal/net/http/HttpConnection.java | 4 +- .../net/httpclient/http2/H2GoAwayTest.java | 6 +- .../internal/net/http/ConnectionPoolTest.java | 8 +- 7 files changed, 537 insertions(+), 272 deletions(-) create mode 100644 src/java.net.http/share/classes/jdk/internal/net/http/Http2TerminationCause.java diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java index c50a4922e80a4..87079521fd5c8 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Exchange.java @@ -31,7 +31,6 @@ import java.time.Duration; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -708,12 +707,12 @@ HttpResponse.BodySubscriber ignoreBody(HttpResponse.ResponseInfo hdrs) { if (s == null) { // s can be null if an exception occurred // asynchronously while sending the preface. - Throwable t = c.getRecordedCause(); + final Throwable terminationException = c.getTerminationException() + .orElse(null); IOException ioe; - if (t != null) { - if (!cached) - c.close(); - ioe = new IOException("Can't get stream 1: " + t, t); + if (terminationException != null) { + ioe = new IOException("Can't get stream 1: " + terminationException, + terminationException); } else { ioe = new IOException("Can't get stream 1"); } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java index cc8a2a7142bfe..6121a0c167357 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2ClientImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -25,7 +25,6 @@ package jdk.internal.net.http; -import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Base64; @@ -234,11 +233,8 @@ void removeFromPool(Http2Connection c) { } } - private EOFException STOPPED; void stop() { if (debug.on()) debug.log("stopping"); - STOPPED = new EOFException("HTTP/2 client stopped"); - STOPPED.setStackTrace(new StackTraceElement[0]); connectionPoolLock.lock(); try { stopping = true; @@ -253,10 +249,7 @@ void stop() { private boolean close(Http2Connection h2c) { // close all streams try { h2c.closeAllStreams(); } catch (Throwable t) {} - // send GOAWAY try { h2c.close(); } catch (Throwable t) {} - // attempt graceful shutdown - try { h2c.shutdown(STOPPED); } catch (Throwable t) {} // double check and close any new streams try { h2c.closeAllStreams(); } catch (Throwable t) {} // Allows for use of removeIf in stop() diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 63889fa6af2ea..b14c547f743ff 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -28,8 +28,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.VarHandle; import java.net.InetSocketAddress; import java.net.ProtocolException; import java.net.http.HttpClient; @@ -37,18 +35,19 @@ import java.net.http.HttpHeaders; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.net.http.HttpConnectTimeoutException; import java.time.Duration; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +55,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.function.Supplier; + import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLException; @@ -143,6 +143,8 @@ class Http2Connection { private static final int MAX_SERVER_STREAM_ID = Integer.MAX_VALUE - 1; // 2147483646 // may be null; must be accessed/updated with the stateLock held private IdleConnectionTimeoutEvent idleConnectionTimeoutEvent; + private final AtomicBoolean goAwaySent = new AtomicBoolean(); + private final AtomicBoolean goAwayRecvd = new AtomicBoolean(); /** * Flag set when no more streams to be opened on this connection. @@ -215,7 +217,7 @@ final class IdleConnectionTimeoutEvent extends TimeoutEvent { } /** - * {@link #shutdown(Throwable) Shuts down} the connection, unless this event is + * {@link #close(Http2TerminationCause) Closes} the connection, unless this event is * {@link #cancelled} */ @Override @@ -228,26 +230,20 @@ public void handle() { try { if (cancelled) { if (debug.on()) { - debug.log("Not initiating idle connection shutdown"); - } - return; - } - if (!markIdleShutdownInitiated()) { - if (debug.on()) { - debug.log("Unexpected state %s, skipping idle connection shutdown", - describeClosedState(closedState)); + debug.log("Not initiating idle connection close"); } return; } + // the connection has been idle long enough, we now + // mark a state indicating that the connection is chosen + // for idle termination and should not be handed out (from the pool) + // for newer requests. + connTerminator.markForIdleTermination(); } finally { stateLock.unlock(); } - if (debug.on()) { - debug.log("Initiating shutdown of HTTP connection which is idle for too long"); - } - HttpConnectTimeoutException hte = new HttpConnectTimeoutException( - "HTTP connection idle, no active streams. Shutting down."); - shutdown(hte); + // terminate the connection due to being idle long enough + connTerminator.idleTimedOut(); } /** @@ -256,7 +252,7 @@ public void handle() { void cancel() { assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock; // mark as cancelled to prevent potentially already triggered event from actually - // doing the shutdown + // doing the close this.cancelled = true; // cancel the timer to prevent the event from being triggered (if it hasn't already) client().cancelTimer(this); @@ -376,16 +372,7 @@ public void onMaxHeaderListSizeReached(long size, int maxHeaderListSize) throws } - private static final int HALF_CLOSED_LOCAL = 1; - private static final int HALF_CLOSED_REMOTE = 2; - private static final int SHUTDOWN_REQUESTED = 4; - // state when idle connection management initiates a shutdown of the connection, after - // which the connection will go into SHUTDOWN_REQUESTED state - private static final int IDLE_SHUTDOWN_INITIATED = 8; private final ReentrantLock stateLock = new ReentrantLock(); - private volatile int closedState; - - //------------------------------------- final HttpConnection connection; private final Http2ClientImpl client2; private final ConcurrentHashMap> streams = new ConcurrentHashMap<>(); @@ -403,7 +390,9 @@ public void onMaxHeaderListSizeReached(long size, int maxHeaderListSize) throws private final Decoder hpackIn; final SettingsFrame clientSettings; private volatile SettingsFrame serverSettings; + private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPromiseFrame pushContFrame) {} + private volatile PushContinuationState pushContinuationState; private final String key; // for HttpClientImpl.connections map private final FramesDecoder framesDecoder; @@ -418,7 +407,7 @@ private record PushContinuationState(PushPromiseDecoder pushContDecoder, PushPro private final FramesController framesController = new FramesController(); private final Http2TubeSubscriber subscriber; final ConnectionWindowUpdateSender windowUpdater; - private final AtomicReference cause = new AtomicReference<>(); + private final Terminator connTerminator = new Terminator(); private volatile Supplier initial; private volatile Stream initialStream; @@ -640,35 +629,31 @@ private boolean reserveStream0(boolean clientInitiated, boolean pushEnabled) thr } void abandonStream() { - boolean shouldClose = false; stateLock.lock(); try { long reserved = --numReservedClientStreams; assert reserved >= 0; - if (finalStream && reserved == 0 && streams.isEmpty()) { - shouldClose = true; - } } catch (Throwable t) { - shutdown(t); // in case the assert fires... + close(Http2TerminationCause.forException(t)); // in case the assert fires... } finally { stateLock.unlock(); } - // We should close the connection here if - // it's not pooled. If it's not pooled it will - // be marked final stream, reserved will be 0 - // after decrementing it by one, and there should - // be no active request-response streams. - if (shouldClose) { - shutdown(new IOException("HTTP/2 connection abandoned")); + // if the connection is eligible to be closed, we close it here + if (shouldClose()) { + close(Http2TerminationCause.noErrorTermination()); } - } - boolean shouldClose() { + /* + * return true if the connection is marked as "final stream" and there + * are no active streams on that connection and the connection isn't + * reserved for a new stream. + */ + final boolean shouldClose() { stateLock.lock(); try { - return finalStream() && streams.isEmpty(); + return finalStream() && streams.isEmpty() && numReservedClientStreams == 0; } finally { stateLock.unlock(); } @@ -840,22 +825,8 @@ final int maxConcurrentServerInitiatedStreams() { return clientSettings.getParameter(MAX_CONCURRENT_STREAMS); } - void close() { - if (markHalfClosedLocal()) { - // we send a GOAWAY frame only if the remote side hasn't already indicated - // the intention to close the connection by previously sending a GOAWAY of its own - if (connection.channel().isOpen() && !isMarked(closedState, HALF_CLOSED_REMOTE)) { - Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address()); - GoAwayFrame f = new GoAwayFrame(0, - ErrorFrame.NO_ERROR, - "Requested by user".getBytes(UTF_8)); - // TODO: set last stream. For now zero ok. - sendFrame(f); - } - } - } - long count; + final void asyncReceive(ByteBuffer buffer) { // We don't need to read anything and // we don't want to send anything back to the server @@ -904,44 +875,39 @@ final void asyncReceive(ByteBuffer buffer) { } catch (Throwable e) { String msg = Utils.stackTrace(e); Log.logTrace(msg); - shutdown(e); + close(Http2TerminationCause.forException(e)); } } - Throwable getRecordedCause() { - return cause.get(); + /** + * Returns the exception which caused the connection to be terminated. If the connection + * hasn't yet been terminated or if the connection was terminated normally (without any + * exception) then this method returns an empty Optional. + */ + final Optional getTerminationException() { + final Http2TerminationCause terminationCause = this.connTerminator.getTerminationCause(); + if (terminationCause == null) { + // connection isn't terminated + return Optional.empty(); + } + return Optional.of(terminationCause.getCloseCause()); } - void shutdown(Throwable t) { - int state = closedState; - if (debug.on()) debug.log(() -> "Shutting down h2c (state="+describeClosedState(state)+"): " + t); - stateLock.lock(); - try { - if (!markShutdownRequested()) return; - cause.compareAndSet(null, t); - } finally { - stateLock.unlock(); - } + /** + * Closes the connection normally (with a NO_ERROR termination cause), if not already closed. + */ + final void close() { + close(Http2TerminationCause.noErrorTermination()); + } - if (Log.errors()) { - if (t!= null && (!(t instanceof EOFException) || isActive())) { - Log.logError(t); - } else if (t != null) { - Log.logError("Shutting down connection: {0}", t.getMessage()); - } else { - Log.logError("Shutting down connection"); - } - } - client2.removeFromPool(this); - subscriber.stop(cause.get()); - for (Stream s : streams.values()) { - try { - s.connectionClosing(t); - } catch (Throwable e) { - Log.logError("Failed to close stream {0}: {1}", s.streamid, e); - } - } - connection.close(cause.get()); + /** + * Closes the connection with the given termination cause, if not already closed. + * + * @param tc the termination cause. cannot be null. + */ + final void close(final Http2TerminationCause tc) { + Objects.requireNonNull(tc, "termination cause cannot be null"); + this.connTerminator.terminate(tc); } /** @@ -981,15 +947,14 @@ void processFrame(Http2Frame frame) throws IOException { } else { if (frame instanceof SettingsFrame) { // The stream identifier for a SETTINGS frame MUST be zero - framesDecoder.close( + protocolError(ErrorFrame.PROTOCOL_ERROR, "The stream identifier for a SETTINGS frame MUST be zero"); - protocolError(GoAwayFrame.PROTOCOL_ERROR); return; } if (frame instanceof PushPromiseFrame && !serverPushEnabled()) { String protocolError = "received a PUSH_PROMISE when SETTINGS_ENABLE_PUSH is 0"; - protocolError(ResetFrame.PROTOCOL_ERROR, protocolError); + protocolError(ErrorFrame.PROTOCOL_ERROR, protocolError); return; } @@ -1144,7 +1109,9 @@ final void releaseUnconsumed(DataFrame df) { // Otherwise, if the frame is dropped after having been added to the // inputQ, releaseUnconsumed above should be called. final void dropDataFrame(DataFrame df) { - if (isMarked(closedState, SHUTDOWN_REQUESTED)) return; + if (!isOpen()) { + return; + } if (debug.on()) { debug.log("Dropping data frame for stream %d (%d payload bytes)", df.streamid(), df.payloadLength()); @@ -1154,7 +1121,9 @@ final void dropDataFrame(DataFrame df) { final void ensureWindowUpdated(DataFrame df) { try { - if (isMarked(closedState, SHUTDOWN_REQUESTED)) return; + if (!isOpen()) { + return; + } int length = df.payloadLength(); if (length > 0) { windowUpdater.update(length); @@ -1251,12 +1220,12 @@ private void handleConnectionFrame(Http2Frame frame) case AltSvcFrame.TYPE -> processAltSvcFrame(0, (AltSvcFrame) frame, connection, connection.client()); - default -> protocolError(ErrorFrame.PROTOCOL_ERROR); + default -> protocolError(ErrorFrame.PROTOCOL_ERROR, "unknown frame: " + frame); } } - boolean isOpen() { - return !isMarkedForShutdown() && connection.channel().isOpen(); + final boolean isOpen() { + return this.connTerminator.terminationCause.get() == null; } void resetStream(int streamid, int code) { @@ -1295,6 +1264,7 @@ void decrementStreamsCount(int streamid) { } } + private void decrementStreamsCount0(int streamid) { Stream s = streams.get(streamid); if (s == null || !s.deRegister()) @@ -1346,8 +1316,7 @@ void closeStream(int streamid) { // corresponding entry in the window controller. windowController.removeStream(streamid); } - if (finalStream() && streams.isEmpty()) { - // should be only 1 stream, but there might be more if server push + if (shouldClose()) { close(); } else { // Start timer if property present and not already created @@ -1372,9 +1341,7 @@ void closeStream(int streamid) { /** * Increments this connection's send Window by the amount in the given frame. */ - private void handleWindowUpdate(WindowUpdateFrame f) - throws IOException - { + private void handleWindowUpdate(WindowUpdateFrame f) { int amount = f.getUpdate(); if (amount <= 0) { // ## temporarily disable to workaround a bug in Jetty where it @@ -1383,37 +1350,19 @@ private void handleWindowUpdate(WindowUpdateFrame f) } else { boolean success = windowController.increaseConnectionWindow(amount); if (!success) { - protocolError(ErrorFrame.FLOW_CONTROL_ERROR); // overflow + protocolError(ErrorFrame.FLOW_CONTROL_ERROR, null); // overflow } } } - private void protocolError(int errorCode) - throws IOException - { - protocolError(errorCode, null); + private void protocolError(final int errorCode, final String msg) { + final Http2TerminationCause terminationCause = + Http2TerminationCause.forH2Error(errorCode, msg); + framesDecoder.close(terminationCause.getLogMsg()); + close(terminationCause); } - private void protocolError(int errorCode, String msg) - throws IOException - { - String protocolError = "protocol error" + (msg == null?"":(": " + msg)); - ProtocolException protocolException = - new ProtocolException(protocolError); - this.cause.compareAndSet(null, protocolException); - if (markHalfClosedLocal()) { - framesDecoder.close(protocolError); - subscriber.stop(protocolException); - if (debug.on()) debug.log("Sending GOAWAY due to " + protocolException); - GoAwayFrame frame = new GoAwayFrame(0, errorCode); - sendFrame(frame); - } - shutdown(protocolException); - } - - private void handleSettings(SettingsFrame frame) - throws IOException - { + private void handleSettings(SettingsFrame frame) { assert frame.streamid() == 0; if (!frame.getFlag(SettingsFrame.ACK)) { int newWindowSize = frame.getParameter(INITIAL_WINDOW_SIZE); @@ -1430,9 +1379,7 @@ private void handleSettings(SettingsFrame frame) } } - private void handlePing(PingFrame frame) - throws IOException - { + private void handlePing(PingFrame frame) { frame.setFlag(PingFrame.ACK); sendUnorderedFrame(frame); } @@ -1442,7 +1389,7 @@ private void handleGoAway(final GoAwayFrame frame) { assert lastProcessedStream >= 0 : "unexpected last stream id: " + lastProcessedStream + " in GOAWAY frame"; - markHalfClosedRemote(); + goAwayRecvd.set(true); setFinalStream(); // don't allow any new streams on this connection if (debug.on()) { debug.log("processing incoming GOAWAY with last processed stream id:%s in frame %s", @@ -1599,20 +1546,20 @@ boolean tryReserveForPoolCheckout() { // must be done with "stateLock" held to co-ordinate idle connection management stateLock.lock(); try { - cancelIdleShutdownEvent(); - // consider the reservation successful only if the connection's state hasn't moved - // to "being closed" - return isOpen(); + cancelIdleCloseEvent(); + // consider the reservation successful only if the connection is open and + // hasn't been chosen for idle termination + return !this.connTerminator.isMarkedForIdleTermination() && isOpen(); } finally { stateLock.unlock(); } } /** - * Cancels any event that might have been scheduled to shutdown this connection. Must be called + * Cancels any event that might have been scheduled to close this connection. Must be called * with the stateLock held. */ - private void cancelIdleShutdownEvent() { + private void cancelIdleCloseEvent() { assert stateLock.isHeldByCurrentThread() : "Current thread doesn't hold " + stateLock; if (idleConnectionTimeoutEvent == null) { return; @@ -1627,20 +1574,23 @@ void putStream(Stream stream, int streamid) { // the stream is closed. stateLock.lock(); try { - if (!isMarkedForShutdown()) { + if (isOpen() && !this.connTerminator.isMarkedForIdleTermination()) { if (debug.on()) { debug.log("Opened stream %d", streamid); } client().streamReference(); streams.put(streamid, stream); - cancelIdleShutdownEvent(); + // don't consider the connection idle anymore + cancelIdleCloseEvent(); return; } } finally { stateLock.unlock(); } if (debug.on()) debug.log("connection closed: closing stream %d", stream); - stream.cancel(new IOException("Stream " + streamid + " cancelled", cause.get())); + final Http2TerminationCause terminationCause = this.connTerminator.getTerminationCause(); + assert terminationCause != null : "termination cause is null"; + stream.cancel(new IOException("Stream " + streamid + " cancelled", terminationCause.getCloseCause())); } /** @@ -1743,11 +1693,10 @@ private Stream registerNewStream(OutgoingHeaders> oh) { int streamid = nextstreamid; Throwable cause = null; synchronized (this) { - if (isMarked(closedState, SHUTDOWN_REQUESTED)) { - cause = this.cause.get(); - if (cause == null) { - cause = new IOException("Connection closed"); - } + if (!isOpen()) { + final Http2TerminationCause terminationCause = this.connTerminator.getTerminationCause(); + assert terminationCause != null : "termination cause is null"; + cause = terminationCause.getCloseCause(); } } if (cause != null) { @@ -1762,7 +1711,7 @@ private Stream registerNewStream(OutgoingHeaders> oh) { return stream; } else { stream.cancelImpl(new IOException("Request cancelled")); - if (finalStream() && streams.isEmpty()) { + if (shouldClose()) { close(); } return null; @@ -1795,14 +1744,12 @@ void sendFrame(Http2Frame frame) { } publisher.signalEnqueued(); } catch (IOException e) { - if (!isMarked(closedState, SHUTDOWN_REQUESTED)) { - if (!client2.stopping()) { - Log.logError(e); - shutdown(e); - } else if (debug.on()) { - debug.log("Failed to send %s while stopping: %s", frame, e); - } + if (!client2.stopping()) { + Log.logError(e); + } else if (debug.on()) { + debug.log("Failed to send %s while stopping: %s", frame, e); } + close(Http2TerminationCause.forException(e)); } } @@ -1817,14 +1764,12 @@ void sendDataFrame(DataFrame frame) { publisher.enqueue(encodeFrame(frame)); publisher.signalEnqueued(); } catch (IOException e) { - if (!isMarked(closedState, SHUTDOWN_REQUESTED)) { - if (!client2.stopping()) { - Log.logError(e); - shutdown(e); - } else if (debug.on()) { - debug.log("Failed to send %s while stopping: %s", frame, e); - } + if (!client2.stopping()) { + Log.logError(e); + } else if (debug.on()) { + debug.log("Failed to send %s while stopping: %s", frame, e); } + close(Http2TerminationCause.forException(e)); } } @@ -1839,10 +1784,12 @@ void sendUnorderedFrame(Http2Frame frame) { publisher.enqueueUnordered(encodeFrame(frame)); publisher.signalEnqueued(); } catch (IOException e) { - if (!isMarked(closedState, SHUTDOWN_REQUESTED)) { + if (!client2.stopping()) { Log.logError(e); - shutdown(e); + } else if (debug.on()) { + debug.log("Failed to send %s while stopping: %s", frame, e); } + close(Http2TerminationCause.forException(e)); } } @@ -1868,6 +1815,7 @@ final void processQueue() { try { while (!queue.isEmpty() && !scheduler.isStopped()) { ByteBuffer buffer = queue.poll(); + assert buffer != null : "null buffer obtained from non-empty queue"; if (debug.on()) debug.log("sending %d to Http2Connection.asyncReceive", buffer.remaining()); @@ -1877,18 +1825,13 @@ final void processQueue() { errorRef.compareAndSet(null, t); } finally { Throwable x = errorRef.get(); - if (x != null) { - scheduler.stop(); - if (client2.stopping()) { - if (debug.on()) { - debug.log("Stopping scheduler"); - } - } else { - if (debug.on()) { - debug.log("Stopping scheduler", x); - } - } - Http2Connection.this.shutdown(x); + // if there was any error or if the TubeSubscriber completed normally, + // then close the connection + if (x != null || completed) { + final Http2TerminationCause tc = (x != null) + ? Http2TerminationCause.forException(x) + : Http2TerminationCause.noErrorTermination(); + Http2Connection.this.close(tc); } } } @@ -1938,11 +1881,17 @@ public void onError(Throwable throwable) { @Override public void onComplete() { if (completed) return; - String msg = isActive() - ? "EOF reached while reading" - : "Idle connection closed by HTTP/2 peer"; - if (debug.on()) debug.log(msg); - errorRef.compareAndSet(null, new EOFException(msg)); + if (isActive()) { + final String msg = "EOF reached while reading"; + errorRef.compareAndSet(null, new EOFException(msg)); + if (debug.on()) { + debug.log(msg); + } + } else { + if (debug.on()) { + debug.log("HTTP/2 connection (with no active streams) closed by peer"); + } + } completed = true; runOrSchedule(); } @@ -1955,16 +1904,13 @@ public void dropSubscription() { dropped = true; } - void stop(Throwable error) { - if (errorRef.compareAndSet(null, error)) { - completed = true; - scheduler.stop(); - queue.clear(); - if (subscription != null) { - subscription.cancel(); - } - queue.clear(); + private void close() { + scheduler.stop(); + queue.clear(); + if (subscription != null) { + subscription.cancel(); } + queue.clear(); } } @@ -1990,6 +1936,7 @@ final String dbgString() { static final class ConnectionWindowUpdateSender extends WindowUpdateSender { final int initialWindowSize; + public ConnectionWindowUpdateSender(Http2Connection connection, int initialWindowSize) { super(connection, initialWindowSize); @@ -2004,13 +1951,9 @@ int getStreamId() { @Override protected boolean windowSizeExceeded(long received) { if (connection.isOpen()) { - try { - connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR, - "connection window exceeded (%s > %s)" - .formatted(received, windowSize)); - } catch (IOException io) { - connection.shutdown(io); - } + connection.protocolError(ErrorFrame.FLOW_CONTROL_ERROR, + "connection window exceeded (%s > %s)" + .formatted(received, windowSize)); } return true; } @@ -2033,72 +1976,117 @@ AbstractAsyncSSLConnection getConnection() { } } - private boolean isMarked(int state, int mask) { - return (state & mask) == mask; - } - - private boolean isMarkedForShutdown() { - final int closedSt = closedState; - return isMarked(closedSt, IDLE_SHUTDOWN_INITIATED) - || isMarked(closedSt, SHUTDOWN_REQUESTED); - } - - private boolean markShutdownRequested() { - return markClosedState(SHUTDOWN_REQUESTED); + private void sendGoAway(final GoAwayFrame goAway) { + // currently we send a GOAWAY just once irrespective of what value the + // last stream id was in the GOAWAY frame + if (!goAwaySent.compareAndSet(false, true)) { + // already sent + return; + } + Log.logTrace("{0} sending GOAWAY {1}", connection, goAway); + if (debug.on()) { + debug.log("sending GOAWAY " + goAway); + } + // this merely enqueues the frame + sendFrame(goAway); } - private boolean markHalfClosedLocal() { - return markClosedState(HALF_CLOSED_LOCAL); - } + // Responsible for doing all the necessary work for closing a Http2Connection + private final class Terminator { + // the cause for closing the connection. Must only be set in the + // Terminator.terminate(Http2TerminationCause) method. + private final AtomicReference terminationCause = new AtomicReference<>(); + // true if it has been decided to terminate the connection due to being idle, + // false otherwise. should be accessed only when holding the stateLock + private boolean chosenForIdleTermination; - private boolean markHalfClosedRemote() { - return markClosedState(HALF_CLOSED_REMOTE); - } + private void terminate(final Http2TerminationCause terminationCause) { + Objects.requireNonNull(terminationCause, "termination cause cannot be null"); + // allow to be terminated only once + stateLock.lock(); + try { + final boolean success = this.terminationCause.compareAndSet(null, terminationCause); + if (!success) { + // already terminated or is being terminated by some other thread + return; + } + // disable the idle timeout event, since we are now going to terminate the + // connection + Http2Connection.this.cancelIdleCloseEvent(); + } finally { + stateLock.unlock(); + } + // do the actual termination + doTerminate(); + } - private boolean markIdleShutdownInitiated() { - return markClosedState(IDLE_SHUTDOWN_INITIATED); - } + private void doTerminate() { + final Http2TerminationCause tc = terminationCause.get(); + assert tc != null : "missing termination cause"; + // we send a GOAWAY frame only if the remote side hasn't already indicated + // the intention to close the connection by previously sending a GOAWAY of its own + if (!Http2Connection.this.goAwayRecvd.get()) { + final int lastStream = 0; // TODO: set last stream. For now zero is ok. + final String peerVisibleReason = tc.getPeerVisibleReason(); + final GoAwayFrame goAway; + if (peerVisibleReason == null) { + goAway = new GoAwayFrame(lastStream, tc.getCloseCode()); + } else { + goAway = new GoAwayFrame(lastStream, tc.getCloseCode(), + peerVisibleReason.getBytes(UTF_8)); + } + sendGoAway(goAway); + } + // now close the connection - private boolean markClosedState(int flag) { - int state, desired; - do { - state = desired = closedState; - if ((state & flag) == flag) return false; - desired = state | flag; - } while (!CLOSED_STATE.compareAndSet(this, state, desired)); - return true; - } + if (Log.errors()) { + Log.logError("Closing connection due to: {0}", tc); + } else { + if (debug.on()) { + final String stateStr = "Erroneous close=" + tc.isErroneousClose() + + ", has active streams=" + isActive() + + ", GOAWAY received=" + goAwayRecvd.get() + + ", GOAWAY sent=" + goAwaySent.get(); + debug.log("Closing connection (" + stateStr + ") due to: " + tc); + } + } - String describeClosedState(int state) { - if (state == 0) return "active"; - String desc = null; - if (isMarked(state, IDLE_SHUTDOWN_INITIATED)) { - desc = "idle-shutdown-initiated"; - } - if (isMarked(state, SHUTDOWN_REQUESTED)) { - desc = desc == null ? "shutdown" : desc + "+shutdown"; + client2.removeFromPool(Http2Connection.this); + // close the TubeSubscriber + subscriber.close(); + // notify the HTTP/2 streams of the connection closure + for (final Stream s : streams.values()) { + try { + s.connectionClosing(tc.getCloseCause()); + } catch (Throwable e) { + Log.logError("Failed to close stream {0}: {1}", s.streamid, e); + } + } + // close the underlying connection + connection.close(tc.getCloseCause()); } - if (isMarked(state, HALF_CLOSED_LOCAL | HALF_CLOSED_REMOTE)) { - if (desc == null) return "closed"; - else return desc + "+closed"; + + private void markForIdleTermination() { + assert stateLock.isHeldByCurrentThread() : Thread.currentThread() + + " not holding stateLock"; + this.chosenForIdleTermination = true; } - if (isMarked(state, HALF_CLOSED_LOCAL)) { - if (desc == null) return "half-closed-local"; - else return desc + "+half-closed-local"; + + private boolean isMarkedForIdleTermination() { + assert stateLock.isHeldByCurrentThread() : Thread.currentThread() + + " not holding stateLock"; + return this.chosenForIdleTermination; } - if (isMarked(state, HALF_CLOSED_REMOTE)) { - if (desc == null) return "half-closed-remote"; - else return desc + "+half-closed-remote"; + + private void idleTimedOut() { + if (debug.on()) { + debug.log("closing connection due to being idle"); + } + this.terminate(Http2TerminationCause.idleTimedOut()); } - return "0x" + Integer.toString(state, 16); - } - private static final VarHandle CLOSED_STATE; - static { - try { - CLOSED_STATE = MethodHandles.lookup().findVarHandle(Http2Connection.class, "closedState", int.class); - } catch (Exception x) { - throw new ExceptionInInitializerError(x); + private Http2TerminationCause getTerminationCause() { + return this.terminationCause.get(); } } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2TerminationCause.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2TerminationCause.java new file mode 100644 index 0000000000000..379db5b575f12 --- /dev/null +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2TerminationCause.java @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ +package jdk.internal.net.http; + + +import java.io.IOException; +import java.net.ProtocolException; +import java.util.Objects; + +import jdk.internal.net.http.frame.ErrorFrame; + +/** + * Termination cause for a {@linkplain Http2Connection HTTP/2 connection} + */ +public abstract sealed class Http2TerminationCause { + private String logMsg; + private String peerVisibleReason; + private final int closeCode; + private final Throwable originalCause; + private final IOException reportedCause; + + private Http2TerminationCause(final int closeCode, final Throwable closeCause) { + this.closeCode = closeCode; + this.originalCause = closeCause; + if (closeCause != null) { + this.logMsg = closeCause.toString(); + } + this.reportedCause = toReportedCause(this.originalCause, this.logMsg); + } + + private Http2TerminationCause(final int closeCode, final String loggedAs) { + this.closeCode = closeCode; + this.originalCause = null; + this.logMsg = loggedAs; + this.reportedCause = toReportedCause(null, this.logMsg); + } + + /** + * Returns the error code (specified for HTTP/2 ErrorFrame) that caused the + * connection termination. + */ + public final int getCloseCode() { + return this.closeCode; + } + + /** + * Returns the IOException that is considered the cause of the connection termination. + * Even a normal {@linkplain #isErroneousClose() non-erroneous} termination will have + * a IOException associated with it, so this method will always return a non-null instance. + */ + public final IOException getCloseCause() { + return this.reportedCause; + } + + /** + * Returns {@code true} if the connection was terminated due to some exception. {@code false} + * otherwise. + * A normal connection termination (for example, the connection idle timing out locally) + * is not considered as an erroneous termination and this method returns {@code false} for + * such cases. + */ + public abstract boolean isErroneousClose(); + + /** + * Returns the connection termination cause, represented as a string. Unlike the + * {@linkplain #getPeerVisibleReason() peer-visible reason}, this log message will not be + * sent across to the peer and it is thus allowed to include additional details that might + * help debugging a connection termination. + */ + public final String getLogMsg() { + return logMsg; + } + + /** + * Returns the connection termination cause, represented as a string. This represents the + * "debugData" that is sent to the peer in a + * {@linkplain jdk.internal.net.http.frame.GoAwayFrame GOAWAY frame}. + */ + public final String getPeerVisibleReason() { + return this.peerVisibleReason; + } + + /** + * Sets the connection termination cause, represented as a string, which will be sent + * to the peer in a {@linkplain jdk.internal.net.http.frame.GoAwayFrame GOAWAY frame}. + * Unlike the {@link #getLogMsg() log message}, + * it is expected that this peer-visible reason will not contain anything that is not meant + * to be viewed by the peer. + */ + protected final void setPeerVisibleReason(final String reasonPhrase) { + this.peerVisibleReason = reasonPhrase; + } + + /** + * Returns a connection termination cause that represents an + * {@linkplain #isErroneousClose() erroneous} termination due to the given {@code cause}. + * + * @param cause the termination cause, cannot be null. + */ + public static Http2TerminationCause forException(final Throwable cause) { + Objects.requireNonNull(cause); + if (cause instanceof ProtocolException pe) { + return new ProtocolError(pe); + } + return new InternalError(cause); + } + + /** + * Returns a connection termination cause that represents a normal + * {@linkplain #isErroneousClose() non-erroneous} termination. + */ + public static Http2TerminationCause noErrorTermination() { + return NoError.INSTANCE; + } + + /** + * Returns a connection termination cause that represents a normal + * {@linkplain #isErroneousClose() non-erroneous} termination due to the connection + * being idle. + */ + public static Http2TerminationCause idleTimedOut() { + return new NoError("HTTP/2 connection idle timed out", "idle timed out"); + } + + /** + * Returns a connection termination cause that represents an + * {@linkplain #isErroneousClose() erroneous} termination due to the given {@code errorCode}. + * Although this method does no checks for the {@code errorCode}, it is expected to be one + * of the error codes specified by the HTTP/2 RFC for the ErrorFrame. + * + * @param errorCode the error code + * @param loggedAs optional log message to be associated with this termination cause + */ + public static Http2TerminationCause forH2Error(final int errorCode, final String loggedAs) { + if (errorCode == ErrorFrame.PROTOCOL_ERROR) { + return new ProtocolError(loggedAs); + } else if (errorCode == ErrorFrame.FLOW_CONTROL_ERROR) { + // we treat flow control error as a protocol error currently + return new ProtocolError(loggedAs, true); + } + return new H2StandardError(errorCode, loggedAs); + } + + private static IOException toReportedCause(final Throwable original, + final String fallbackExceptionMsg) { + if (original == null) { + return fallbackExceptionMsg == null + ? new IOException("connection terminated") + : new IOException(fallbackExceptionMsg); + } else if (original instanceof IOException ioe) { + return ioe; + } else { + return new IOException(original); + } + } + + private static final class NoError extends Http2TerminationCause { + private static final IOException NO_ERROR_MARKER = + new IOException("HTTP/2 connection closed normally - no error"); + + static { + // remove the stacktrace from this marker exception instance + NO_ERROR_MARKER.setStackTrace(new StackTraceElement[0]); + } + + private static final NoError INSTANCE = new NoError(); + + private NoError() { + super(ErrorFrame.NO_ERROR, NO_ERROR_MARKER); + setPeerVisibleReason("no error"); + } + + private NoError(final String loggedAs, final String peerVisibleReason) { + super(ErrorFrame.NO_ERROR, loggedAs); + if (peerVisibleReason != null) { + setPeerVisibleReason(peerVisibleReason); + } + } + + @Override + public boolean isErroneousClose() { + return false; + } + + @Override + public String toString() { + return "No error - normal termination"; + } + } + + private static sealed class H2StandardError extends Http2TerminationCause { + private H2StandardError(final int errCode, final String msg) { + super(errCode, msg); + setPeerVisibleReason(ErrorFrame.stringForCode(errCode)); + } + + private H2StandardError(final int errCode, final Throwable cause) { + super(errCode, cause); + setPeerVisibleReason(ErrorFrame.stringForCode(errCode)); + } + + @Override + public boolean isErroneousClose() { + return getCloseCode() != ErrorFrame.NO_ERROR; + } + + @Override + public String toString() { + return ErrorFrame.stringForCode(this.getCloseCode()); + } + } + + private static final class ProtocolError extends H2StandardError { + private ProtocolError(final String msg) { + this(msg, false); + } + + private ProtocolError(final String msg, final boolean flowControlError) { + super(flowControlError + ? ErrorFrame.FLOW_CONTROL_ERROR + : ErrorFrame.PROTOCOL_ERROR, + new ProtocolException(msg)); + } + + private ProtocolError(final ProtocolException pe) { + super(ErrorFrame.PROTOCOL_ERROR, pe); + } + + @Override + public boolean isErroneousClose() { + return true; + } + + @Override + public String toString() { + return "Protocol error - " + this.getLogMsg(); + } + } + + private static final class InternalError extends Http2TerminationCause { + private InternalError(final Throwable cause) { + super(ErrorFrame.INTERNAL_ERROR, new Exception(cause)); + } + + @Override + public boolean isErroneousClose() { + return true; + } + + @Override + public String toString() { + return "Internal error - " + this.getLogMsg(); + } + } +} diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java index 0219b0960d767..115bc56f804ee 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpConnection.java @@ -540,9 +540,7 @@ public List getSNIServerNames() { * Closes this connection due to the given cause. * @param cause the cause for which the connection is closed, may be null */ - void close(Throwable cause) { - close(); - } + abstract void close(Throwable cause); /** * {@return the underlying connection flow, if applicable} diff --git a/test/jdk/java/net/httpclient/http2/H2GoAwayTest.java b/test/jdk/java/net/httpclient/http2/H2GoAwayTest.java index 755bb2e16cc11..c5db99ce16484 100644 --- a/test/jdk/java/net/httpclient/http2/H2GoAwayTest.java +++ b/test/jdk/java/net/httpclient/http2/H2GoAwayTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -250,11 +250,15 @@ public void testUnprocessedRaisesExceptionAsync() throws Throwable { } catch (ExecutionException ee) { final Throwable cause = ee.getCause(); if (!(cause instanceof IOException ioe)) { + System.err.println("unexpected exception: " + cause + + ", for request " + REQ_URI_BASE + reqQueryPart); throw cause; } // verify it failed for the right reason if (ioe.getMessage() == null || !ioe.getMessage().contains("request not processed by peer")) { + System.err.println("unexpected exception message: " + ioe.getMessage() + + ", for request " + REQ_URI_BASE + reqQueryPart); // propagate the original failure throw ioe; } diff --git a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java index 27d2b98a34d3c..90cac20dbfdc4 100644 --- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java +++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/ConnectionPoolTest.java @@ -501,9 +501,15 @@ long newConnectionId(HttpClientImpl client) { @Override SocketChannel channel() {return channel;} @Override public void close() { + this.close(null); + } + + @Override + void close(final Throwable cause) { closed=finished=true; - System.out.println("closed: " + this); + System.out.println("closed: " + this + " cause: " + cause); } + @Override public String toString() { return "HttpConnectionStub: " + address + " proxy: " + proxy; From b8e2866fee5a1c80cc11f785297e190b78f77ea8 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Sun, 2 Nov 2025 13:17:52 +0530 Subject: [PATCH 2/8] introduce a test to reproduce the leak and verify the fix --- .../httpclient/http2/BurstyRequestsTest.java | 246 ++++++++++++++++++ .../lib/http2/Http2TestServerConnection.java | 9 +- 2 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java diff --git a/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java new file mode 100644 index 0000000000000..19df990c6215d --- /dev/null +++ b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import jdk.httpclient.test.lib.http2.BodyOutputStream; +import jdk.httpclient.test.lib.http2.Http2Handler; +import jdk.httpclient.test.lib.http2.Http2TestExchange; +import jdk.httpclient.test.lib.http2.Http2TestExchangeSupplier; +import jdk.httpclient.test.lib.http2.Http2TestServer; +import jdk.httpclient.test.lib.http2.Http2TestServerConnection; +import jdk.internal.net.http.common.HttpHeadersBuilder; +import jdk.test.lib.net.SimpleSSLContext; +import jdk.test.lib.net.URIBuilder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import static java.net.http.HttpClient.Builder.NO_PROXY; +import static java.net.http.HttpClient.Version.HTTP_2; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/* + * @test + * @bug 8326498 8361091 + * @summary verify that the HttpClient does not leak connections when dealing with + * sudden rush of HTTP/2 requests + * @modules java.net.http/jdk.internal.net.http:+open + * java.net.http/jdk.internal.net.http.common + * java.net.http/jdk.internal.net.http.frame + * java.net.http/jdk.internal.net.http.hpack + * java.net.http/jdk.internal.net.http.quic + * java.net.http/jdk.internal.net.http.quic.packets + * java.net.http/jdk.internal.net.http.quic.frames + * java.net.http/jdk.internal.net.http.quic.streams + * java.net.http/jdk.internal.net.http.http3.streams + * java.net.http/jdk.internal.net.http.http3.frames + * java.net.http/jdk.internal.net.http.http3 + * java.net.http/jdk.internal.net.http.qpack + * java.net.http/jdk.internal.net.http.qpack.readers + * java.net.http/jdk.internal.net.http.qpack.writers + * java.logging + * java.base/jdk.internal.net.quic + * java.base/jdk.internal.util + * java.base/sun.net.www.http + * java.base/sun.net.www + * java.base/sun.net + * + * @library /test/lib /test/jdk/java/net/httpclient/lib + * @build jdk.test.lib.net.SimpleSSLContext + * jdk.httpclient.test.lib.http2.Http2TestServer + * jdk.httpclient.test.lib.http2.Http2Handler + * jdk.httpclient.test.lib.http2.Http2TestExchange + * jdk.httpclient.test.lib.http2.Http2TestExchangeSupplier + * @run junit ${test.main.class} + */ +class BurstyRequestsTest { + + private static final String HANDLER_PATH = "/8326498/"; + + private static Field openConnections; // jdk.internal.net.http.HttpClientImpl#openedConnections + + private static SSLContext sslContext; + private static Http2TestServer h2server; + + @BeforeAll + static void beforeAll() throws Exception { + openConnections = Class.forName("jdk.internal.net.http.HttpClientImpl") + .getDeclaredField("openedConnections"); + openConnections.setAccessible(true); + + sslContext = new SimpleSSLContext().get(); + h2server = new Http2TestServer(true, sslContext); + h2server.setExchangeSupplier(new ExchangeSupplier()); + h2server.addHandler(new Handler(), HANDLER_PATH); + h2server.start(); + System.err.println("started HTTP/2 server " + h2server.getAddress()); + } + + @AfterAll + static void afterAll() { + if (h2server != null) { + System.err.println("stopping server " + h2server.getAddress()); + h2server.stop(); + } + } + + /* + * Issues a burst of 100 HTTP/2 requests to the same server (host/port) and expects all of + * them to complete normally. + * Once these requests have completed, the test then peeks into an internal field of the + * HttpClientImpl to verify that the client is holding on to at most 1 connection. + */ + @Test + void testOpenConnections() throws Exception { + final URI reqURI = URIBuilder.newBuilder() + .scheme("https") + .host(h2server.getAddress().getAddress()) + .port(h2server.getAddress().getPort()) + .path(HANDLER_PATH) + .build(); + final HttpRequest req = HttpRequest.newBuilder().uri(reqURI).build(); + + final int numRequests = 100; + // latch for the tasks to wait on, before issuing the requests + final CountDownLatch startLatch = new CountDownLatch(numRequests); + final List> futures = new ArrayList<>(); + + try (final ExecutorService executor = Executors.newCachedThreadPool(); + final HttpClient client = HttpClient.newBuilder() + .sslContext(sslContext) + .proxy(NO_PROXY) + .version(HTTP_2) + .build()) { + // our test needs to peek into the internal field of jdk.internal.net.http.HttpClientImpl, + // so we skip the test if the HttpClient isn't of the expected type + final Object clientImpl = reflectHttpClientImplInstance(client); + assumeTrue(clientImpl != null, + "skipping test against HttpClient of type " + client.getClass().getName()); + + for (int i = 0; i < numRequests; i++) { + final Future f = executor.submit(new RequestIssuer(startLatch, client, req)); + futures.add(f); + } + // wait for the requests to complete + for (final Future f : futures) { + f.get(); + } + + // all requests are done, now verify that the current open TCP connections + // is not more than 1. + final Set currentOpenConns = (Set) openConnections.get(clientImpl); + System.err.println("current open connections: " + currentOpenConns); + final int size = currentOpenConns.size(); + // we expect at most 1 connection will stay open + assertTrue((size == 0 || size == 1), + "unexpected number of current open connections: " + size); + } + } + + // using reflection, return the jdk.internal.net.http.HttpClientImpl instance held + // by the given client + private static Object reflectHttpClientImplInstance(final HttpClient client) throws Exception { + if (!client.getClass().getName().equals("jdk.internal.net.http.HttpClientFacade")) { + return null; + } + final Field implField = client.getClass().getDeclaredField("impl"); + implField.setAccessible(true); + final Object clientImpl = implField.get(client); + if (clientImpl == null) { + return null; + } + if (!clientImpl.getClass().getName().equals("jdk.internal.net.http.HttpClientImpl")) { + return null; + } + return clientImpl; // the expected HttpClientImpl instance + } + + private static final class RequestIssuer implements Callable { + private final CountDownLatch startLatch; + private final HttpClient client; + private final HttpRequest request; + + private RequestIssuer(final CountDownLatch startLatch, final HttpClient client, + final HttpRequest request) { + this.startLatch = startLatch; + this.client = client; + this.request = request; + } + + @Override + public Void call() throws Exception { + this.startLatch.countDown(); // announce our arrival + this.startLatch.await(); // wait for other threads to arrive + // issue the request + final HttpResponse resp = this.client.send(request, BodyHandlers.discarding()); + if (resp.statusCode() != 200) { + throw new AssertionError("unexpected response status code: " + resp.statusCode()); + } + return null; + } + } + + private static final class Handler implements Http2Handler { + private static final int NO_RESP_BODY = -1; + + @Override + public void handle(final Http2TestExchange exchange) throws IOException { + System.err.println("handling request " + exchange.getRequestURI()); + exchange.sendResponseHeaders(200, NO_RESP_BODY); + } + } + + private static final class ExchangeSupplier implements Http2TestExchangeSupplier { + + @Override + public Http2TestExchange get(int streamid, String method, HttpHeaders reqheaders, + HttpHeadersBuilder rspheadersBuilder, URI uri, InputStream is, + SSLSession sslSession, BodyOutputStream os, + Http2TestServerConnection conn, boolean pushAllowed) { + // don't close the connection when/if the client sends a GOAWAY + conn.closeConnOnIncomingGoAway = false; + return Http2TestExchangeSupplier.ofDefault().get(streamid, method, reqheaders, + rspheadersBuilder, uri, is, sslSession, os, conn, pushAllowed); + } + } +} diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index 20668d281c828..63c0ba5f5b862 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -117,6 +117,7 @@ public class Http2TestServerConnection { final Properties properties; volatile boolean stopping; volatile int nextPushStreamId = 2; + public volatile boolean closeConnOnIncomingGoAway = true; ConcurrentLinkedQueue pings = new ConcurrentLinkedQueue<>(); // the max stream id of a processed H2 request. -1 implies none were processed. private final AtomicInteger maxProcessedRequestStreamId = new AtomicInteger(-1); @@ -537,8 +538,12 @@ private void handleCommonFrame(Http2Frame f) throws IOException { outputQ.put(frame); return; } else if (f instanceof GoAwayFrame) { - System.err.println(server.name + ": Closing connection: "+ f.toString()); - close(ErrorFrame.NO_ERROR); + if (closeConnOnIncomingGoAway) { + System.err.println(server.name + ": Closing connection: "+ f.toString()); + close(ErrorFrame.NO_ERROR); + } else { + System.err.println(server.name + ": Will not close connection for incoming GOAWAY: " + f); + } } else if (f instanceof PingFrame) { handlePing((PingFrame)f); } else From 88c6bbc2727d5ca87a7b9b7d59b29ba4e8a06cf7 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 4 Nov 2025 15:37:49 +0530 Subject: [PATCH 3/8] Daniel's review suggestion - stop the scheduler when TubeSubscriber errored or completed --- .../jdk/internal/net/http/Http2Connection.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index b14c547f743ff..dbf8c0241377e 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -1828,6 +1828,20 @@ final void processQueue() { // if there was any error or if the TubeSubscriber completed normally, // then close the connection if (x != null || completed) { + // although the connection terminator stops the scheduler too, + // we don't want to wait that "long" and instead we should immediately + // stop the scheduler so that we don't enter "processQueue" anymore. + scheduler.stop(); + if (client2.stopping()) { + if (debug.on()) { + debug.log("Stopping scheduler"); + } + } else { + if (debug.on()) { + debug.log("Stopping scheduler", x); + } + } + // terminate the connection final Http2TerminationCause tc = (x != null) ? Http2TerminationCause.forException(x) : Http2TerminationCause.noErrorTermination(); @@ -2050,10 +2064,9 @@ private void doTerminate() { debug.log("Closing connection (" + stateStr + ") due to: " + tc); } } - - client2.removeFromPool(Http2Connection.this); // close the TubeSubscriber subscriber.close(); + client2.removeFromPool(Http2Connection.this); // notify the HTTP/2 streams of the connection closure for (final Stream s : streams.values()) { try { From 4cf52bb2669a460d081467a5eb01023dc63495e5 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Thu, 6 Nov 2025 16:07:58 +0530 Subject: [PATCH 4/8] Return false from isOpen() if underlying channel is not open --- .../internal/net/http/Http2Connection.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index dbf8c0241377e..6cf7a2c9071d0 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -34,6 +34,7 @@ import java.net.http.HttpClient.Version; import java.net.http.HttpHeaders; import java.nio.ByteBuffer; +import java.nio.channels.NetworkChannel; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; @@ -1224,8 +1225,12 @@ private void handleConnectionFrame(Http2Frame frame) } } + /** + * Returns true if this connection hasn't been terminated and the underlying + * {@linkplain NetworkChannel#isOpen() channel is open}. false otherwise. + */ final boolean isOpen() { - return this.connTerminator.terminationCause.get() == null; + return this.connTerminator.terminationCause.get() == null && connection.channel().isOpen(); } void resetStream(int streamid, int code) { @@ -2099,7 +2104,18 @@ private void idleTimedOut() { } private Http2TerminationCause getTerminationCause() { - return this.terminationCause.get(); + final Http2TerminationCause tc = this.terminationCause.get(); + if (tc != null) { + return tc; + } + if (!connection.channel().isOpen()) { + // terminate the connection + terminate(Http2TerminationCause.forException(new IOException("channel is not open"))); + final Http2TerminationCause terminated = this.terminationCause.get(); + assert terminated != null : "missing termination cause"; + return terminated; + } + return null; } } } From 392f694e8f247af9ffb579e17f33ed91bb48a35e Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 11 Nov 2025 17:05:40 +0530 Subject: [PATCH 5/8] update test --- .../httpclient/http2/BurstyRequestsTest.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java index 19df990c6215d..e53e7d88932d5 100644 --- a/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java +++ b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java @@ -97,7 +97,7 @@ class BurstyRequestsTest { private static final String HANDLER_PATH = "/8326498/"; - private static Field openConnections; // jdk.internal.net.http.HttpClientImpl#openedConnections + private static Field openConnections; // Set<> jdk.internal.net.http.HttpClientImpl#openedConnections private static SSLContext sslContext; private static Http2TestServer h2server; @@ -165,12 +165,25 @@ void testOpenConnections() throws Exception { for (final Future f : futures) { f.get(); } - - // all requests are done, now verify that the current open TCP connections - // is not more than 1. + System.err.println("all " + numRequests + " requests completed successfully"); + // the request completion happens asynchronously to the closing of the HTTP/2 Stream + // as well as the HTTP/2 connection. we wait for at most 1 connection to be retained + // by HttpClientImpl. + System.err.println("waiting for at least " + (numRequests - 1) + " connections to be closed"); + // now verify that the current open TCP connections is not more than 1. + // we let the test timeout if we never reach that count. final Set currentOpenConns = (Set) openConnections.get(clientImpl); - System.err.println("current open connections: " + currentOpenConns); - final int size = currentOpenConns.size(); + int size = currentOpenConns.size(); + System.err.println("currently " + size + " open connections: " + currentOpenConns); + while (size > 1) { + // wait + Thread.sleep(100); + final int prev = size; + size = currentOpenConns.size(); + if (prev != size) { + System.err.println("currently " + size + " open connections: " + currentOpenConns); + } + } // we expect at most 1 connection will stay open assertTrue((size == 0 || size == 1), "unexpected number of current open connections: " + size); From 6c9830260702c654e821fbde82228cb6e919ed1f Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 11 Nov 2025 18:14:22 +0530 Subject: [PATCH 6/8] cleanup --- .../internal/net/http/Http2Connection.java | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 6cf7a2c9071d0..24f9e61755bd3 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -881,17 +881,29 @@ final void asyncReceive(ByteBuffer buffer) { } /** - * Returns the exception which caused the connection to be terminated. If the connection - * hasn't yet been terminated or if the connection was terminated normally (without any - * exception) then this method returns an empty Optional. + * Returns the exception which caused the connection to be terminated. Even a normal termination + * of a connection will have a {@code IOException} associated with it. + * If the connection hasn't yet been terminated then this method returns an empty Optional. */ final Optional getTerminationException() { final Http2TerminationCause terminationCause = this.connTerminator.getTerminationCause(); - if (terminationCause == null) { - // connection isn't terminated - return Optional.empty(); + if (terminationCause != null) { + return Optional.of(terminationCause.getCloseCause()); + } + // there can be window of race where the termination cause isn't yet set + // but the connection isn't open. that can happen when the underlying SocketChannel + // is closed behind the scenes and the Http2Connection isn't aware of it and hasn't + // set a termination cause for it. + // so here we check if the connection isn't open and if it isn't then we call + // Terminator.getTerminationCause() which has the necessary infrastructure to create + // and return a termination cause for that situation. + if (!isOpen()) { + final Http2TerminationCause tc = this.connTerminator.getTerminationCause(); + assert tc != null : "termination cause is null for a closed connection"; + return Optional.of(tc.getCloseCause()); } - return Optional.of(terminationCause.getCloseCause()); + // connection isn't terminated + return Optional.empty(); } /** @@ -2103,13 +2115,20 @@ private void idleTimedOut() { this.terminate(Http2TerminationCause.idleTimedOut()); } + /** + * Returns the termination cause for the connection. This method guarantees + * that if the {@linkplain Http2Connection#isOpen() connection is not open} + * then this returns a non-null termination cause. + */ private Http2TerminationCause getTerminationCause() { final Http2TerminationCause tc = this.terminationCause.get(); if (tc != null) { return tc; } if (!connection.channel().isOpen()) { - // terminate the connection + // if the underlying SocketChannel isn't open, then terminate the connection. + // that way when Http2Connection.isOpen() returns false in that situation, then this + // getTerminationCause() will return a termination cause. terminate(Http2TerminationCause.forException(new IOException("channel is not open"))); final Http2TerminationCause terminated = this.terminationCause.get(); assert terminated != null : "missing termination cause"; From 1a70acc67c1efa929a217c45c2d8aaa2798de154 Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 11 Nov 2025 19:37:12 +0530 Subject: [PATCH 7/8] reduce number of concurrent requests --- test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java index e53e7d88932d5..ae948d53f785a 100644 --- a/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java +++ b/test/jdk/java/net/httpclient/http2/BurstyRequestsTest.java @@ -140,7 +140,7 @@ void testOpenConnections() throws Exception { .build(); final HttpRequest req = HttpRequest.newBuilder().uri(reqURI).build(); - final int numRequests = 100; + final int numRequests = 20; // latch for the tasks to wait on, before issuing the requests final CountDownLatch startLatch = new CountDownLatch(numRequests); final List> futures = new ArrayList<>(); From 3e4574b40e69f60dcc5355e53332f3d18673df4b Mon Sep 17 00:00:00 2001 From: Jaikiran Pai Date: Tue, 11 Nov 2025 19:45:43 +0530 Subject: [PATCH 8/8] mark jdk.internal.net.http.Http2Connection as Closable --- .../classes/jdk/internal/net/http/Http2Connection.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java index 24f9e61755bd3..3429f31c2c09d 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java @@ -25,6 +25,7 @@ package jdk.internal.net.http; +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; @@ -131,7 +132,7 @@ * and incoming stream creation (Server push). Incoming frames destined for a * stream are provided by calling Stream.incoming(). */ -class Http2Connection { +class Http2Connection implements Closeable { final Logger debug = Utils.getDebugLogger(this::dbgString); static final Logger DEBUG_LOGGER = @@ -909,7 +910,8 @@ final Optional getTerminationException() { /** * Closes the connection normally (with a NO_ERROR termination cause), if not already closed. */ - final void close() { + @Override + public final void close() { close(Http2TerminationCause.noErrorTermination()); }