Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"opensearch-py[async] == 2.8.0",
"uvicorn[standard] == 0.35.0",
"httpx[http2] == 0.28.1",
"httpx-retries",
"aiohttp[speedups] == 3.12.15",
"boto3 ~= 1.37",
"fastapi == 0.116.1",
Expand Down
22 changes: 9 additions & 13 deletions yente/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,15 @@ async def read_path_lines(path: Path) -> AsyncGenerator[Any, None]:
async def stream_http_lines(
url: str, auth_token: Optional[str] = None
) -> AsyncGenerator[Any, None]:
for retry in count():
try:
async with httpx_session(auth_token=auth_token) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
yield orjson.loads(line)
return
except httpx.TransportError as exc:
if retry > 3:
raise
await asyncio.sleep(1.0)
log.error("Streaming index HTTP error: %s, retrying..." % exc)
try:
async with httpx_session(auth_token=auth_token) as client:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
yield orjson.loads(line)
return
except httpx.TransportError as exc:
log.error("Streaming index HTTP error: %s, failed after retries." % exc)


async def load_json_lines(
Expand Down
4 changes: 3 additions & 1 deletion yente/data/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import warnings
import httpx
import unicodedata
from httpx_retries import Retry, RetryTransport
from pathlib import Path
from functools import lru_cache
from urllib.parse import urlparse
Expand Down Expand Up @@ -175,7 +176,8 @@ def auth_flow(
async def httpx_session(
auth_token: Optional[str] = None,
) -> AsyncGenerator[httpx.AsyncClient, None]:
transport = httpx.AsyncHTTPTransport(retries=3)
retry = Retry(total=3, backoff_factor=2)
transport = RetryTransport(transport=httpx.AsyncHTTPTransport(), retry=retry)
proxy = settings.HTTP_PROXY if settings.HTTP_PROXY != "" else None
headers = {"User-Agent": f"Yente/{settings.VERSION}"}
async with httpx.AsyncClient(
Expand Down