Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions azure/durable_functions/models/utils/http_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,80 @@
from typing import Any, List, Union
from typing import Any, List, Union, Optional
import asyncio

import aiohttp


# Global session and lock for thread-safe initialization
_client_session: Optional[aiohttp.ClientSession] = None
_session_lock: asyncio.Lock = asyncio.Lock()


async def _get_session() -> aiohttp.ClientSession:
"""Get or create the shared ClientSession.

Returns
-------
aiohttp.ClientSession
The shared client session with configured timeout and connection pooling.
"""
global _client_session

# Double-check locking pattern for async
if _client_session is None or _client_session.closed:
async with _session_lock:
# Check again after acquiring lock
if _client_session is None or _client_session.closed:
# Configure timeout optimized for localhost IPC
timeout = aiohttp.ClientTimeout(
total=240, # 4-minute total timeout for slow operations
sock_connect=10, # Fast connection over localhost
sock_read=None # Covered by total timeout
)

# Configure TCP connector optimized for localhost IPC
connector = aiohttp.TCPConnector(
limit=30, # Maximum connections for single host
limit_per_host=30, # Maximum connections per host
enable_cleanup_closed=True # Enable cleanup of closed connections
)

_client_session = aiohttp.ClientSession(
timeout=timeout,
connector=connector
)

return _client_session


async def _handle_request_error():
"""Handle connection errors by closing and resetting the session.

This handles cases where the remote host process recycles.
"""
global _client_session
async with _session_lock:
if _client_session is not None and not _client_session.closed:
try:
await _client_session.close()
finally:
_client_session = None


async def _close_session() -> None:
"""Close the shared ClientSession if it exists.

This function should be called during worker shutdown.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This comment says we should call _close_session on worker shutdown, but it is only called by _handle_request_error. Can we add the call during shutdown or update the comment?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way to hook into the shutdown? If not, I'll just update the comment since process shutdown should clean up all resources anyways.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we hook into anything from worker/SDK for shutdown currently, not sure if something like that exists. I'm OK just updating the comment.

"""
global _client_session

async with _session_lock:
if _client_session is not None and not _client_session.closed:
try:
await _client_session.close()
finally:
_client_session = None


async def post_async_request(url: str,
data: Any = None,
trace_parent: str = None,
Expand All @@ -25,19 +97,25 @@ async def post_async_request(url: str,
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
async with aiohttp.ClientSession() as session:
headers = {}
if trace_parent:
headers["traceparent"] = trace_parent
if trace_state:
headers["tracestate"] = trace_state
session = await _get_session()
headers = {}
if trace_parent:
headers["traceparent"] = trace_parent
if trace_state:
headers["tracestate"] = trace_state

try:
async with session.post(url, json=data, headers=headers) as response:
# We disable aiohttp's input type validation
# as the server may respond with alternative
# data encodings. This is potentially unsafe.
# More here: https://docs.aiohttp.org/en/stable/client_advanced.html
data = await response.json(content_type=None)
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise


async def get_async_request(url: str) -> List[Any]:
Expand All @@ -53,12 +131,18 @@ async def get_async_request(url: str) -> List[Any]:
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
async with aiohttp.ClientSession() as session:
session = await _get_session()

try:
async with session.get(url) as response:
data = await response.json(content_type=None)
if data is None:
data = ""
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise


async def delete_async_request(url: str) -> List[Union[int, Any]]:
Expand All @@ -74,7 +158,13 @@ async def delete_async_request(url: str) -> List[Union[int, Any]]:
[int, Any]
Tuple with the Response status code and the data returned from the request
"""
async with aiohttp.ClientSession() as session:
session = await _get_session()

try:
async with session.delete(url) as response:
data = await response.json(content_type=None)
return [response.status, data]
except (aiohttp.ClientError, asyncio.TimeoutError):
# On connection errors, close and recreate session for next request
await _handle_request_error()
raise
Loading