diff --git a/pyproject.toml b/pyproject.toml index dd1a4b07..88b1989c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "opensearch-py[async] == 2.8.0", "uvicorn[standard] == 0.35.0", "httpx[http2] == 0.28.1", + "httpx-retries", "aiohttp[speedups] == 3.12.15", "boto3 ~= 1.37", "fastapi == 0.116.1", diff --git a/yente/data/loader.py b/yente/data/loader.py index 9d9e6f49..b4cc811d 100644 --- a/yente/data/loader.py +++ b/yente/data/loader.py @@ -61,19 +61,15 @@ async def read_path_lines(path: Path) -> AsyncGenerator[Any, None]: async def stream_http_lines( url: str, auth_token: Optional[str] = None ) -> AsyncGenerator[Any, None]: - for retry in count(): - try: - async with httpx_session(auth_token=auth_token) as client: - async with client.stream("GET", url) as resp: - resp.raise_for_status() - async for line in resp.aiter_lines(): - yield orjson.loads(line) - return - except httpx.TransportError as exc: - if retry > 3: - raise - await asyncio.sleep(1.0) - log.error("Streaming index HTTP error: %s, retrying..." % exc) + try: + async with httpx_session(auth_token=auth_token) as client: + async with client.stream("GET", url) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + yield orjson.loads(line) + return + except httpx.TransportError as exc: + log.error("Streaming index HTTP error: %s, failed after retries." % exc) async def load_json_lines( diff --git a/yente/data/util.py b/yente/data/util.py index 35129f60..25b724a5 100644 --- a/yente/data/util.py +++ b/yente/data/util.py @@ -1,6 +1,7 @@ import warnings import httpx import unicodedata +from httpx_retries import Retry, RetryTransport from pathlib import Path from functools import lru_cache from urllib.parse import urlparse @@ -175,7 +176,8 @@ def auth_flow( async def httpx_session( auth_token: Optional[str] = None, ) -> AsyncGenerator[httpx.AsyncClient, None]: - transport = httpx.AsyncHTTPTransport(retries=3) + retry = Retry(total=3, backoff_factor=2) + transport = RetryTransport(transport=httpx.AsyncHTTPTransport(), retry=retry) proxy = settings.HTTP_PROXY if settings.HTTP_PROXY != "" else None headers = {"User-Agent": f"Yente/{settings.VERSION}"} async with httpx.AsyncClient(