Skip to content

Commit 9f5e3ac

Browse files
authored
Merge pull request #396 from chinandrew/epidata-async
Add async method to epidata
2 parents 92742d5 + 2dc1065 commit 9f5e3ac

File tree

3 files changed

+75
-0
lines changed

3 files changed

+75
-0
lines changed

integrations/client/test_delphi_epidata.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,3 +471,48 @@ def test_covidcast_nowcast(self):
471471
'src', 'sig1', 'sensor', 'day', 'county', 22222222, '01001')
472472

473473
self.assertEqual(response, {'result': -2, 'message': 'no results'})
474+
475+
def test_async_epidata(self):
476+
# insert dummy data
477+
self.cur.execute('''
478+
insert into covidcast values
479+
(0, 'src', 'sig', 'day', 'county', 20200414, '11111',
480+
123, 10, 11, 12, 456, 13, 20200414, 0, 1, False),
481+
(0, 'src', 'sig', 'day', 'county', 20200414, '22222',
482+
123, 20, 21, 22, 456, 23, 20200414, 0, 1, False),
483+
(0, 'src', 'sig', 'day', 'county', 20200414, '33333',
484+
123, 30, 31, 32, 456, 33, 20200414, 0, 1, False),
485+
(0, 'src', 'sig', 'day', 'msa', 20200414, '11111',
486+
123, 40, 41, 42, 456, 43, 20200414, 0, 1, False),
487+
(0, 'src', 'sig', 'day', 'msa', 20200414, '22222',
488+
123, 50, 51, 52, 456, 53, 20200414, 0, 1, False),
489+
(0, 'src', 'sig', 'day', 'msa', 20200414, '33333',
490+
123, 60, 61, 62, 456, 634, 20200414, 0, 1, False)
491+
''')
492+
self.cnx.commit()
493+
test_output = Epidata.async_epidata([
494+
{
495+
'source': 'covidcast',
496+
'data_source': 'src',
497+
'signals': 'sig',
498+
'time_type': 'day',
499+
'geo_type': 'county',
500+
'geo_value': '11111',
501+
'time_values': '20200414'
502+
},
503+
{
504+
'source': 'covidcast',
505+
'data_source': 'src',
506+
'signals': 'sig',
507+
'time_type': 'day',
508+
'geo_type': 'county',
509+
'geo_value': '00000',
510+
'time_values': '20200414'
511+
}
512+
], batch_size=10)
513+
responses = [i[0] for i in test_output]*12
514+
# check response is same as standard covidcast call, using 24 calls to test batch sizing
515+
self.assertEqual(responses,
516+
[Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '11111'),
517+
Epidata.covidcast('src', 'sig', 'day', 'county', 20200414, '00000')]*12
518+
)

src/client/delphi_epidata.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
# External modules
1212
import requests
13+
import asyncio
14+
import warnings
1315

16+
from aiohttp import ClientSession
1417
from pkg_resources import get_distribution, DistributionNotFound
1518

1619
# Obtain package version for the user-agent. Uses the installed version by
@@ -704,3 +707,29 @@ def covidcast_nowcast(
704707

705708
# Make the API call
706709
return Epidata._request(params)
710+
711+
@staticmethod
712+
def async_epidata(param_list, batch_size=100):
713+
"""Make asynchronous Epidata calls for a list of parameters."""
714+
async def async_get(params, session):
715+
"""Helper function to make Epidata GET requests."""
716+
async with session.get(Epidata.BASE_URL, params=params) as response:
717+
return await response.json(), params
718+
719+
async def async_make_calls(param_combos):
720+
"""Helper function to asynchronously make and aggregate Epidata GET requests."""
721+
tasks = []
722+
async with ClientSession() as session:
723+
for param in param_combos:
724+
task = asyncio.ensure_future(async_get(param, session))
725+
tasks.append(task)
726+
responses = await asyncio.gather(*tasks)
727+
return responses
728+
729+
batches = [param_list[i:i+batch_size] for i in range(0, len(param_list), batch_size)]
730+
responses = []
731+
for batch in batches:
732+
loop = asyncio.get_event_loop()
733+
future = asyncio.ensure_future(async_make_calls(batch))
734+
responses += loop.run_until_complete(future)
735+
return responses

src/client/packaging/pypi/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
url='https://github.com/cmu-delphi/delphi-epidata',
1515
packages=setuptools.find_packages(),
1616
install_requires=[
17+
'aiohttp'
1718
'requests>=2.7.0',
1819
],
1920
classifiers=[

0 commit comments

Comments
 (0)