diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 2b0a99c0..fac7f1e9 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -320,6 +320,7 @@ private function readResponse( while (null !== $chunk = $this->readChunk($timeout)) { parseChunk: $response = $parser->parse($chunk); + if ($response === null) { if ($this->socket === null) { throw new SocketException('Socket closed prior to response completion'); @@ -377,17 +378,20 @@ private function readResponse( $bodyDeferredCancellation )); + [$reqTimeout, $explicitTimeout, $priorTimeout] = $this->determineKeepAliveTimeout($response); + // Read body async EventLoop::queue(function () use ( $parser, $request, - $response, + $reqTimeout, + $explicitTimeout, + $priorTimeout, $bodyEmitter, $trailersDeferred, $originalCancellation, $readingCancellation, $bodyCancellation, - $stream, $timeout, &$trailers ) { @@ -443,7 +447,14 @@ private function readResponse( } } - $timeout = $this->determineKeepAliveTimeout($response); + if ($explicitTimeout) { + $this->explicitTimeout = $explicitTimeout; + } + if ($priorTimeout !== null) { + $this->priorTimeout = $priorTimeout; + } + + $timeout = $reqTimeout; if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) { $this->timeoutWatcher = EventLoop::delay($timeout, $this->close(...)); @@ -546,7 +557,8 @@ private function getRemainingTime(): float return \max(0, $timestamp - now()); } - private function determineKeepAliveTimeout(Response $response): int + /** @return list{int, bool, ?int} */ + private function determineKeepAliveTimeout(Response $response): array { $request = $response->getRequest(); @@ -554,25 +566,21 @@ private function determineKeepAliveTimeout(Response $response): int $responseConnHeader = $response->getHeader('connection') ?? ''; if (!\strcasecmp($requestConnHeader, 'close')) { - return 0; + return [0, false, null]; } if ($response->getProtocolVersion() === '1.0') { - return 0; + return [0, false, null]; } if (!\strcasecmp($responseConnHeader, 'close')) { - return 0; + return [0, false, null]; } $params = Http\parseMultipleHeaderFields($response, 'keep-alive')[0] ?? null; $timeout = (int) ($params['timeout'] ?? $this->priorTimeout); - if (isset($params['timeout'])) { - $this->explicitTimeout = true; - } - - return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT); + return [0, isset($params['timeout']), \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT)]; } /** diff --git a/src/Connection/Internal/Http1Parser.php b/src/Connection/Internal/Http1Parser.php index 7b8914b3..dc77da27 100644 --- a/src/Connection/Internal/Http1Parser.php +++ b/src/Connection/Internal/Http1Parser.php @@ -13,6 +13,8 @@ use Amp\Http\HttpMessage; use Amp\Http\HttpStatus; use Amp\Http\InvalidHeaderException; +use WeakReference; + use function Amp\Http\Client\events; use function Amp\Http\mapHeaderPairs; @@ -39,7 +41,8 @@ final class Http1Parser public const TRAILERS_START = 4; public const TRAILERS = 5; - private ?Response $response = null; + /** @var ?WeakReference */ + private ?WeakReference $response = null; private int $state = self::AWAITING_HEADERS; @@ -110,9 +113,11 @@ public function parse(?string $data = null): ?Response if (!$this->bodyStarted && \in_array($this->state, [self::BODY_CHUNKS, self::BODY_IDENTITY, self::BODY_IDENTITY_EOF], true)) { $this->bodyStarted = true; - $response = $this->response; - \assert($response !== null); - events()->responseBodyStart($this->request, $this->stream, $response); + $response = $this->response?->get(); + if ($response !== null) { + events()->responseBodyStart($this->request, $this->stream, $response); + unset($response); + } } switch ($this->state) { @@ -185,13 +190,15 @@ public function parse(?string $data = null): ?Response events()->responseHeaderEnd($this->request, $this->stream, $response); - return $this->response = $response; + $this->response = WeakReference::create($response); + return $response; } body_identity: { - if ($data !== null && $data !== '') { - events()->responseBodyProgress($this->request, $this->stream, $this->response); + if ($data !== null && $data !== '' && ($r = $this->response->get())) { + events()->responseBodyProgress($this->request, $this->stream, $r); + unset($r); } $bufferDataSize = \strlen($this->buffer); @@ -219,8 +226,9 @@ public function parse(?string $data = null): ?Response body_identity_eof: { - if ($data !== null && $data !== '') { - events()->responseBodyProgress($this->request, $this->stream, $this->response); + if ($data !== null && $data !== '' && ($r = $this->response->get())) { + events()->responseBodyProgress($this->request, $this->stream, $r); + unset($r); } $this->addToBody($this->buffer); @@ -230,8 +238,9 @@ public function parse(?string $data = null): ?Response body_chunks: { - if ($data !== null && $data !== '') { - events()->responseBodyProgress($this->request, $this->stream, $this->response); + if ($data !== null && $data !== '' && ($r = $this->response->get())) { + events()->responseBodyProgress($this->request, $this->stream, $r); + unset($r); } if ($this->parseChunkedBody()) { @@ -272,7 +281,10 @@ public function parse(?string $data = null): ?Response complete: { - events()->responseBodyEnd($this->request, $this->stream, $this->response); + if ($r = $this->response->get()) { + events()->responseBodyEnd($this->request, $this->stream, $r); + unset($r); + } $this->complete = true; diff --git a/test/Connection/ConnectionLimitingPoolTest.php b/test/Connection/ConnectionLimitingPoolTest.php index 669d3d50..070c1fe9 100644 --- a/test/Connection/ConnectionLimitingPoolTest.php +++ b/test/Connection/ConnectionLimitingPoolTest.php @@ -39,6 +39,42 @@ public function testSingleConnection(): void ]); } + public function testSingleConnectionDoNotUseBody(): void + { + $client = (new HttpClientBuilder) + ->usingPool(ConnectionLimitingPool::byAuthority(1)) + ->build(); + + $this->setMinimumRuntime(2); + + $r1 = new Request('https://httpbin.org/delay/1'); + $r1->setProtocolVersions(['1.1']); + $r2 = new Request('https://httpbin.org/delay/1'); + $r2->setProtocolVersions(['1.1']); + Future\await([ + async($client->request(...), $r1), + async($client->request(...), $r2), + ]); + } + + public function testSingleConnectionDoNotUseBodyHttp2(): void + { + $client = (new HttpClientBuilder) + ->usingPool(ConnectionLimitingPool::byAuthority(1)) + ->build(); + + $this->setMinimumRuntime(1); + + $r1 = new Request('https://httpbin.org/delay/1'); + $r1->setProtocolVersions(['2']); + $r2 = new Request('https://httpbin.org/delay/1'); + $r2->setProtocolVersions(['2']); + Future\await([ + async($client->request(...), $r1), + async($client->request(...), $r2), + ]); + } + public function testTwoConnections(): void { $client = (new HttpClientBuilder)