Skip to content

Commit 9e46bc2

Browse files
committed
Make combo cases/deaths robust to source outages
- okay to return just one of the data frames if both aren't available - do something sensible if neither one is available - unit tests to verify all possible cases are handled appropriately Fixes #330
1 parent b21252f commit 9e46bc2

File tree

3 files changed

+92
-38
lines changed

3 files changed

+92
-38
lines changed

combo_cases_and_deaths/delphi_combo_cases_and_deaths/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
"state",
1919
"msa",
2020
"hrr",
21-
]
21+
]

combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,68 @@
99
from datetime import date, timedelta, datetime
1010
from itertools import product
1111
import re
12-
import sys
1312

1413
import covidcast
1514
import pandas as pd
1615

17-
from delphi_utils import read_params, create_export_csv
18-
from .constants import *
19-
from .handle_wip_signal import *
16+
from delphi_utils import read_params
17+
from .constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS
18+
from .handle_wip_signal import add_prefix
2019

2120

2221
def check_not_none(data_frame, label, date_range):
2322
"""Exit gracefully if a data frame we attempted to retrieve is empty"""
2423
if data_frame is None:
2524
print(f"{label} not available in range {date_range}")
26-
sys.exit(1)
25+
return False
26+
return True
2727

28-
def combine_usafacts_and_jhu(signal, geo, date_range):
28+
def maybe_append(df1, df2):
29+
"""
30+
If both data frames are available, append them and return. Otherwise, return
31+
whichever frame is not None.
32+
"""
33+
if df1 is None:
34+
return df2
35+
if df2 is None:
36+
return df1
37+
return df1.append(df2)
38+
39+
COLUMN_MAPPING = {"time_value": "timestamp",
40+
"geo_value": "geo_id",
41+
"value": "val",
42+
"stderr": "se",
43+
"sample_size": "sample_size"}
44+
def combine_usafacts_and_jhu(signal, geo, date_range, fetcher=covidcast.signal):
2945
"""
3046
Add rows for PR from JHU signals to USA-FACTS signals
3147
"""
32-
usafacts_df = covidcast.signal("usa-facts", signal, date_range[0], date_range[1], geo)
33-
jhu_df = covidcast.signal("jhu-csse", signal, date_range[0], date_range[1], geo)
34-
check_not_none(usafacts_df, "USA-FACTS", date_range)
35-
check_not_none(jhu_df, "JHU", date_range)
48+
print("Fetching usa-facts...")
49+
usafacts_df = fetcher("usa-facts", signal, date_range[0], date_range[1], geo)
50+
print("Fetching jhu-csse...")
51+
jhu_df = fetcher("jhu-csse", signal, date_range[0], date_range[1], geo)
52+
53+
if (not check_not_none(usafacts_df, "USA-FACTS", date_range)) and \
54+
(geo not in ('state', 'county') or \
55+
(not check_not_none(jhu_df, "JHU", date_range))):
56+
return pd.DataFrame({}, columns=COLUMN_MAPPING.values())
3657

3758
# State level
3859
if geo == 'state':
39-
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == 'pr'])
60+
combined_df = maybe_append(
61+
usafacts_df,
62+
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == 'pr'])
4063
# County level
4164
elif geo == 'county':
42-
combined_df = usafacts_df.append(jhu_df[jhu_df["geo_value"] == '72000'])
65+
combined_df = maybe_append(
66+
usafacts_df,
67+
jhu_df if jhu_df is None else jhu_df[jhu_df["geo_value"] == '72000'])
4368
# For MSA and HRR level, they are the same
4469
else:
4570
combined_df = usafacts_df
4671

4772
combined_df = combined_df.drop(["direction"], axis=1)
48-
combined_df = combined_df.rename({"time_value": "timestamp",
49-
"geo_value": "geo_id",
50-
"value": "val",
51-
"stderr": "se"},
73+
combined_df = combined_df.rename(COLUMN_MAPPING,
5274
axis=1)
5375
return combined_df
5476

@@ -83,15 +105,12 @@ def sensor_signal(metric, sensor, smoother):
83105
sensor_name = "_".join([smoother, sensor])
84106
else:
85107
sensor_name = sensor
86-
signal = "_".join([metric, sensor_name])
87-
return sensor_name, signal
88-
89-
def run_module():
90-
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
91-
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
92-
for (metric, geo_res, sensor, smoother) in
93-
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
108+
return sensor_name, "_".join([metric, sensor_name])
94109

110+
def configure(variants):
111+
"""
112+
Validate params file and set date range.
113+
"""
95114
params = read_params()
96115
params['export_start_date'] = date(*params['export_start_date'])
97116
yesterday = date.today() - timedelta(days=1)
@@ -112,30 +131,38 @@ def run_module():
112131
# create combined files for all of the historical reports
113132
params['date_range'] = [params['export_start_date'], yesterday]
114133
else:
115-
pattern = re.compile(r'^\d{8}-\d{8}$')
116-
match_res = re.findall(pattern, params['date_range'])
134+
match_res = re.findall(re.compile(r'^\d{8}-\d{8}$'), params['date_range'])
117135
if len(match_res) == 0:
118136
raise ValueError(
119137
"Invalid date_range parameter. Please choose from (new, all, yyyymmdd-yyyymmdd).")
120138
try:
121139
date1 = datetime.strptime(params['date_range'][:8], '%Y%m%d').date()
122-
except ValueError:
123-
raise ValueError("Invalid date_range parameter. Please check the first date.")
140+
except ValueError as error:
141+
raise ValueError(
142+
"Invalid date_range parameter. Please check the first date.") from error
124143
try:
125144
date2 = datetime.strptime(params['date_range'][-8:], '%Y%m%d').date()
126-
except ValueError:
127-
raise ValueError("Invalid date_range parameter. Please check the second date.")
145+
except ValueError as error:
146+
raise ValueError(
147+
"Invalid date_range parameter. Please check the second date.") from error
128148

129149
#The the valid start date
130150
if date1 < params['export_start_date']:
131151
date1 = params['export_start_date']
132152
params['date_range'] = [date1, date2]
153+
return params
133154

134-
for metric, geo_res, sensor_name, signal in variants:
135155

136-
df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name))
156+
def run_module():
157+
"""Produce a combined cases and deaths signal using data from JHU and USA Facts"""
158+
variants = [tuple((metric, geo_res)+sensor_signal(metric, sensor, smoother))
159+
for (metric, geo_res, sensor, smoother) in
160+
product(METRICS, GEO_RESOLUTIONS, SENSORS, SMOOTH_TYPES)]
161+
params = configure(variants)
162+
for metric, geo_res, sensor_name, signal in variants:
163+
df = combine_usafacts_and_jhu(signal, geo_res, extend_raw_date_range(params, sensor_name)) # pylint: disable=invalid-name
137164

138-
df = df.copy()
165+
df = df.copy() # pylint: disable=invalid-name
139166
df["timestamp"] = pd.to_datetime(df["timestamp"])
140167
start_date = pd.to_datetime(params['export_start_date'])
141168
export_dir = params["export_dir"]
@@ -145,8 +172,7 @@ def run_module():
145172

146173
signal_name = add_prefix([signal], wip_signal=params["wip_signal"], prefix="wip_")
147174
for date_ in dates:
148-
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_' f"{signal_name[0]}.csv"
175+
export_fn = f'{date_.strftime("%Y%m%d")}_{geo_res}_{signal_name[0]}.csv'
149176
df[df["timestamp"] == date_][["geo_id", "val", "se", "sample_size", ]].to_csv(
150177
f"{export_dir}/{export_fn}", index=False, na_rep="NA"
151178
)
152-

combo_cases_and_deaths/tests/test_run.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
from itertools import product
33
import pytest
44
import unittest
5+
import pandas as pd
56

6-
from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal
7+
from delphi_combo_cases_and_deaths.run import extend_raw_date_range, sensor_signal, combine_usafacts_and_jhu, COLUMN_MAPPING
78
from delphi_combo_cases_and_deaths.handle_wip_signal import add_prefix
89
from delphi_utils import read_params
9-
from delphi_combo_cases_and_deaths.constants import *
10+
from delphi_combo_cases_and_deaths.constants import METRICS, SMOOTH_TYPES, SENSORS, GEO_RESOLUTIONS
1011

1112

1213
def test_issue_dates():
@@ -49,6 +50,33 @@ def test_handle_wip_signal():
4950
assert signal_names[0].startswith("wip_")
5051
assert all(not s.startswith("wip_") for s in signal_names[1:])
5152

53+
def test_unstable_sources():
54+
placeholder = lambda geo: pd.DataFrame(
55+
[(date.today(),"pr" if geo == "state" else "72000",1,1,1,0)],
56+
columns="time_value geo_value value stderr sample_size direction".split())
57+
fetcher10 = lambda *x: placeholder(x[-1]) if x[0] == "usa-facts" else None
58+
fetcher01 = lambda *x: placeholder(x[-1]) if x[0] == "jhu-csse" else None
59+
fetcher11 = lambda *x: placeholder(x[-1])
60+
fetcher00 = lambda *x: None
61+
62+
date_range = [date.today(), date.today()]
63+
64+
for geo in "state county msa".split():
65+
for (fetcher, expected_size) in [
66+
(fetcher00, 0),
67+
(fetcher01, 0 if geo == "msa" else 1),
68+
(fetcher10, 1),
69+
(fetcher11, 1 if geo == "msa" else 2)
70+
]:
71+
df = combine_usafacts_and_jhu("", geo, date_range, fetcher)
72+
assert df.size == expected_size * len(COLUMN_MAPPING), f"""
73+
input for {geo}:
74+
{fetcher('usa-facts',geo)}
75+
{fetcher('jhu-csse',geo)}
76+
77+
output:
78+
{df}
79+
"""
5280

5381
class MyTestCase(unittest.TestCase):
5482
pass

0 commit comments

Comments
 (0)