diff --git a/httpcore/.DS_Store b/httpcore/.DS_Store new file mode 100644 index 000000000..c2989f810 Binary files /dev/null and b/httpcore/.DS_Store differ diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 0493a923d..b836d8290 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -25,7 +25,7 @@ map_exceptions, ) from .._models import Origin, Request, Response -from .._synchronization import AsyncLock, AsyncShieldCancellation +from .._synchronization import AsyncThreadLock from .._trace import Trace from .interfaces import AsyncConnectionInterface @@ -62,7 +62,9 @@ def __init__( self._keepalive_expiry: Optional[float] = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW - self._state_lock = AsyncLock() + self._state_thread_lock = ( + AsyncThreadLock() + ) # thread-lock for sync, no-op for async self._request_count = 0 self._h11_state = h11.Connection( our_role=h11.CLIENT, @@ -76,7 +78,9 @@ async def handle_async_request(self, request: Request) -> Response: f"to {self._origin}" ) - async with self._state_lock: + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -137,9 +141,8 @@ async def handle_async_request(self, request: Request) -> Response: }, ) except BaseException as exc: - with AsyncShieldCancellation(): - async with Trace("response_closed", logger, request) as trace: - await self._response_closed() + if self._connection_should_close(): + await self._network_stream.aclose() raise exc # Sending the request... @@ -242,8 +245,12 @@ async def _receive_event( # mypy fails to narrow the type in the above if statement above return event # type: ignore[return-value] - async def _response_closed(self) -> None: - async with self._state_lock: + def _connection_should_close(self) -> bool: + # Once the response is complete we either need to move into + # an IDLE or CLOSED state. + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if ( self._h11_state.our_state is h11.DONE and self._h11_state.their_state is h11.DONE @@ -253,8 +260,10 @@ async def _response_closed(self) -> None: if self._keepalive_expiry is not None: now = time.monotonic() self._expire_at = now + self._keepalive_expiry - else: - await self.aclose() + return False + + self._state = HTTPConnectionState.CLOSED + return True # Once the connection is no longer required... @@ -344,15 +353,16 @@ async def __aiter__(self) -> AsyncIterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - with AsyncShieldCancellation(): - await self.aclose() + if self._connection._connection_should_close(): + await self._connection.aclose() raise exc async def aclose(self) -> None: - if not self._closed: - self._closed = True - async with Trace("response_closed", logger, self._request): - await self._connection._response_closed() + async with Trace("response_closed", logger, self._request, kwargs={}): + if not self._closed: + self._closed = True + if self._connection._connection_should_close(): + await self._connection.aclose() class AsyncHTTP11UpgradeStream(AsyncNetworkStream): diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a74ff8e80..c27f0d324 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -25,7 +25,7 @@ map_exceptions, ) from .._models import Origin, Request, Response -from .._synchronization import Lock, ShieldCancellation +from .._synchronization import ThreadLock from .._trace import Trace from .interfaces import ConnectionInterface @@ -62,7 +62,9 @@ def __init__( self._keepalive_expiry: Optional[float] = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW - self._state_lock = Lock() + self._state_thread_lock = ( + ThreadLock() + ) # thread-lock for sync, no-op for async self._request_count = 0 self._h11_state = h11.Connection( our_role=h11.CLIENT, @@ -76,7 +78,9 @@ def handle_request(self, request: Request) -> Response: f"to {self._origin}" ) - with self._state_lock: + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -137,9 +141,8 @@ def handle_request(self, request: Request) -> Response: }, ) except BaseException as exc: - with ShieldCancellation(): - with Trace("response_closed", logger, request) as trace: - self._response_closed() + if self._connection_should_close(): + self._network_stream.close() raise exc # Sending the request... @@ -242,8 +245,12 @@ def _receive_event( # mypy fails to narrow the type in the above if statement above return event # type: ignore[return-value] - def _response_closed(self) -> None: - with self._state_lock: + def _connection_should_close(self) -> bool: + # Once the response is complete we either need to move into + # an IDLE or CLOSED state. + with self._state_thread_lock: + # We ensure that state changes at the start and end of a + # request/response cycle are thread-locked. if ( self._h11_state.our_state is h11.DONE and self._h11_state.their_state is h11.DONE @@ -253,8 +260,10 @@ def _response_closed(self) -> None: if self._keepalive_expiry is not None: now = time.monotonic() self._expire_at = now + self._keepalive_expiry - else: - self.close() + return False + + self._state = HTTPConnectionState.CLOSED + return True # Once the connection is no longer required... @@ -344,15 +353,16 @@ def __iter__(self) -> Iterator[bytes]: # If we get an exception while streaming the response, # we want to close the response (and possibly the connection) # before raising that exception. - with ShieldCancellation(): - self.close() + if self._connection._connection_should_close(): + self._connection.close() raise exc def close(self) -> None: - if not self._closed: - self._closed = True - with Trace("response_closed", logger, self._request): - self._connection._response_closed() + with Trace("response_closed", logger, self._request, kwargs={}): + if not self._closed: + self._closed = True + if self._connection._connection_should_close(): + self._connection.close() class HTTP11UpgradeStream(NetworkStream): diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2fc272049..27e47eb34 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -398,8 +398,6 @@ async def trace(name, kwargs): "http11.send_request_body.complete", "http11.receive_response_headers.started", "http11.receive_response_headers.failed", - "http11.response_closed.started", - "http11.response_closed.complete", ] diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index ee303e5cf..c73a7c2f7 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -398,8 +398,6 @@ def trace(name, kwargs): "http11.send_request_body.complete", "http11.receive_response_headers.started", "http11.receive_response_headers.failed", - "http11.response_closed.started", - "http11.response_closed.complete", ]