Skip to content

Commit da57e20

Browse files
committed
Have async raise 4xx/5xx status codes and add simultaneous connection limit
1 parent 12ac99c commit da57e20

File tree

1 file changed

+8
-9
lines changed

1 file changed

+8
-9
lines changed

src/client/delphi_epidata.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import asyncio
1414
import warnings
1515

16-
from aiohttp import ClientSession
16+
from aiohttp import ClientSession, TCPConnector
1717
from pkg_resources import get_distribution, DistributionNotFound
1818

1919
# Obtain package version for the user-agent. Uses the installed version by
@@ -709,27 +709,26 @@ def covidcast_nowcast(
709709
return Epidata._request(params)
710710

711711
@staticmethod
712-
def async_epidata(param_list, batch_size=100):
712+
def async_epidata(param_list, batch_size=50):
713713
"""Make asynchronous Epidata calls for a list of parameters."""
714714
async def async_get(params, session):
715715
"""Helper function to make Epidata GET requests."""
716716
async with session.get(Epidata.BASE_URL, params=params) as response:
717+
response.raise_for_status()
717718
return await response.json(), params
718719

719720
async def async_make_calls(param_combos):
720721
"""Helper function to asynchronously make and aggregate Epidata GET requests."""
721722
tasks = []
722-
async with ClientSession() as session:
723+
connector = TCPConnector(limit=batch_size)
724+
async with ClientSession(connector=connector) as session:
723725
for param in param_combos:
724726
task = asyncio.ensure_future(async_get(param, session))
725727
tasks.append(task)
726728
responses = await asyncio.gather(*tasks)
727729
return responses
728730

729-
batches = [param_list[i:i+batch_size] for i in range(0, len(param_list), batch_size)]
730-
responses = []
731-
for batch in batches:
732-
loop = asyncio.get_event_loop()
733-
future = asyncio.ensure_future(async_make_calls(batch))
734-
responses += loop.run_until_complete(future)
731+
loop = asyncio.get_event_loop()
732+
future = asyncio.ensure_future(async_make_calls(param_list))
733+
responses = loop.run_until_complete(future)
735734
return responses

0 commit comments

Comments
 (0)