Skip to content

Commit 6e40663

Browse files
authored
Merge pull request #1393 from cmu-delphi/release/indicators_v0.2.10_utils_v0.2.6
Release covidcast-indicators 0.2.10
2 parents fd82e19 + a0f94aa commit 6e40663

File tree

17 files changed

+123
-49
lines changed

17 files changed

+123
-49
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 0.2.9
2+
current_version = 0.2.10
33
commit = True
44
message = chore: bump covidcast-indicators to {new_version}
55
tag = False

ansible/templates/changehc-params-prod.json.j2

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
"parallel": false,
2323
"geos": ["state", "msa", "hrr", "county", "hhs", "nation"],
2424
"weekday": [true, false],
25-
"types": ["covid","cli"],
25+
"types": ["covid","cli","flu"],
2626
"wip_signal": "",
2727
"ftp_conn": {
2828
"host": "{{ changehc_sftp_host }}",

changehc/delphi_changehc/constants.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
SMOOTHED_ADJ = "smoothed_adj_outpatient_covid"
44
SMOOTHED_CLI = "smoothed_outpatient_cli"
55
SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli"
6-
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI]
6+
SMOOTHED_FLU = "smoothed_outpatient_flu"
7+
SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu"
8+
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU]
79
NA = "NA"
810
HRR = "hrr"
911
FIPS = "fips"

changehc/delphi_changehc/download_ftp_files.py

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,35 +41,8 @@ def get_files_from_dir(sftp, filedate, out_path):
4141
sftp.get(infile, outfile, callback=callback_for_filename)
4242

4343

44-
def download_covid(filedate, out_path, ftp_conn):
45-
"""Download files necessary to create chng-covid signal from ftp server.
46-
47-
Args:
48-
filedate: YYYYmmdd string for which the files are named
49-
out_path: Path to local directory into which to download the files
50-
ftp_conn: Dict containing login credentials to ftp server
51-
"""
52-
# open client
53-
try:
54-
client = paramiko.SSHClient()
55-
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
56-
57-
client.connect(ftp_conn["host"], username=ftp_conn["user"],
58-
password=ftp_conn["pass"],
59-
port=ftp_conn["port"],
60-
allow_agent=False, look_for_keys=False)
61-
sftp = client.open_sftp()
62-
63-
sftp.chdir('/countproducts')
64-
get_files_from_dir(sftp, filedate, out_path)
65-
66-
finally:
67-
if client:
68-
client.close()
69-
70-
71-
def download_cli(filedate, out_path, ftp_conn):
72-
"""Download files necessary to create chng-cli signal from ftp server.
44+
def download_counts(filedate, out_path, ftp_conn):
45+
"""Download files necessary to create chng- signals from ftp server.
7346
7447
Args:
7548
filedate: YYYYmmdd string for which the files are named

changehc/delphi_changehc/load_data.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,36 @@ def load_cli_data(denom_filepath, flu_filepath, mixed_filepath, flu_like_filepat
153153
data = data[["num", "den"]]
154154

155155
return data
156+
157+
158+
def load_flu_data(denom_filepath, flu_filepath, dropdate, base_geo):
159+
"""Load in denominator and flu data, and combine them.
160+
161+
Args:
162+
denom_filepath: path to the aggregated denominator data
163+
flu_filepath: path to the aggregated flu data
164+
dropdate: data drop date (datetime object)
165+
base_geo: base geographic unit before aggregation ('fips')
166+
167+
Returns:
168+
combined multiindexed dataframe, index 0 is geo_base, index 1 is date
169+
"""
170+
assert base_geo == "fips", "base unit must be 'fips'"
171+
172+
# load each data stream
173+
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
174+
Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL)
175+
flu_data = load_chng_data(flu_filepath, dropdate, base_geo,
176+
Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL)
177+
178+
# merge data
179+
data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True)
180+
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"
181+
182+
# calculate combined numerator and denominator
183+
data.fillna(0, inplace=True)
184+
data["num"] = data[Config.FLU_COL]
185+
data["den"] = data[Config.DENOM_COL]
186+
data = data[["num", "den"]]
187+
188+
return data

changehc/delphi_changehc/run.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from delphi_utils import get_structured_logger
1515

1616
# first party
17-
from .download_ftp_files import download_covid, download_cli
18-
from .load_data import load_combined_data, load_cli_data
17+
from .download_ftp_files import download_counts
18+
from .load_data import load_combined_data, load_cli_data, load_flu_data
1919
from .update_sensor import CHCSensorUpdater
2020

2121

@@ -26,10 +26,7 @@ def retrieve_files(params, filedate, logger):
2626

2727
## download recent files from FTP server
2828
logger.info("downloading recent files through SFTP")
29-
if "covid" in params["indicator"]["types"]:
30-
download_covid(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
31-
if "cli" in params["indicator"]["types"]:
32-
download_cli(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
29+
download_counts(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
3330

3431
denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
3532
covid_file = "%s/%s_Counts_Products_Covid.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
@@ -53,6 +50,8 @@ def retrieve_files(params, filedate, logger):
5350
file_dict["mixed"] = mixed_file
5451
file_dict["flu_like"] = flu_like_file
5552
file_dict["covid_like"] = covid_like_file
53+
if "flu" in params["indicator"]["types"]:
54+
file_dict["flu"] = flu_file
5655
return file_dict
5756

5857

@@ -75,6 +74,9 @@ def make_asserts(params):
7574
files["flu_like"] is not None and \
7675
files["covid_like"] is not None,\
7776
"files must be all present or all absent"
77+
if "flu" in params["indicator"]["types"]:
78+
assert (files["denom"] is None) == (files["flu"] is None), \
79+
"exactly one of denom and flu files are provided"
7880

7981

8082
def run_module(params: Dict[str, Dict[str, Any]]):
@@ -182,6 +184,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
182184
elif numtype == "cli":
183185
data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"],
184186
file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips")
187+
elif numtype == "flu":
188+
data = load_flu_data(file_dict["denom"],file_dict["flu"],
189+
dropdate_dt,"fips")
185190
more_stats = su_inst.update_sensor(
186191
data,
187192
params["common"]["export_dir"],

changehc/delphi_changehc/update_sensor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
# first party
1717
from .config import Config
18-
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
18+
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\
19+
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA
1920
from .sensor import CHCSensor
2021

2122

@@ -114,9 +115,11 @@ def __init__(self,
114115
signal_name = SMOOTHED_ADJ if self.weekday else SMOOTHED
115116
elif self.numtype == "cli":
116117
signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI
118+
elif self.numtype == "flu":
119+
signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU
117120
else:
118121
raise ValueError(f'Unsupported numtype received "{numtype}",'
119-
f' must be one of ["covid", "cli"]')
122+
f' must be one of ["covid", "cli", "flu"]')
120123
self.signal_name = add_prefix([signal_name], wip_signal=wip_signal)[0]
121124

122125
# initialize members set in shift_dates().

changehc/tests/test_load_data.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
"indicator": {
1515
"input_denom_file": "test_data/20200601_Counts_Products_Denom.dat.gz",
1616
"input_covid_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
17+
"input_flu_file": "test_data/20200601_Counts_Products_Covid.dat.gz",
1718
"drop_date": "2020-06-01"
1819
}
1920
}
2021
COVID_FILEPATH = PARAMS["indicator"]["input_covid_file"]
22+
FLU_FILEPATH = PARAMS["indicator"]["input_flu_file"]
2123
DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"]
2224
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
2325

@@ -29,6 +31,8 @@ class TestLoadData:
2931
Config.COVID_COLS, Config.COVID_DTYPES, Config.COVID_COL)
3032
combined_data = load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE,
3133
"fips")
34+
flu_data = load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, DROP_DATE,
35+
"fips")
3236
gmpr = GeoMapper()
3337

3438
def test_base_unit(self):
@@ -43,6 +47,9 @@ def test_base_unit(self):
4347
with pytest.raises(AssertionError):
4448
load_combined_data(DENOM_FILEPATH, COVID_FILEPATH, DROP_DATE, "foo")
4549

50+
with pytest.raises(AssertionError):
51+
load_flu_data(DENOM_FILEPATH, FLU_FILEPATH, DROP_DATE, "foo")
52+
4653
def test_denom_columns(self):
4754
assert "fips" in self.denom_data.index.names
4855
assert "timestamp" in self.denom_data.index.names
@@ -71,6 +78,16 @@ def test_combined_columns(self):
7178
assert len(
7279
set(self.combined_data.columns) - set(expected_combined_columns)) == 0
7380

81+
def test_flu_columns(self):
82+
assert "fips" in self.flu_data.index.names
83+
assert "timestamp" in self.flu_data.index.names
84+
85+
expected_flu_columns = ["num", "den"]
86+
for col in expected_flu_columns:
87+
assert col in self.flu_data.columns
88+
assert len(
89+
set(self.flu_data.columns) - set(expected_flu_columns)) == 0
90+
7491
def test_edge_values(self):
7592
for data in [self.denom_data,
7693
self.covid_data,

facebook/delphiFacebook/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ importFrom(stats,na.omit)
112112
importFrom(stats,setNames)
113113
importFrom(stats,weighted.mean)
114114
importFrom(stringi,stri_extract)
115+
importFrom(stringi,stri_extract_first)
115116
importFrom(stringi,stri_replace)
116117
importFrom(stringi,stri_replace_all)
117118
importFrom(stringi,stri_split)

facebook/delphiFacebook/R/aggregate.R

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ summarize_indicators_day <- function(day_df, indicators, target_day, geo_level,
179179
var_weight <- indicators$var_weight[row]
180180
compute_fn <- indicators$compute_fn[[row]]
181181

182+
# Prevent smoothed weighted signals from being reported for dates after
183+
# the latest available weight data.
184+
if (target_day > params$latest_weight_date &&
185+
indicators$smooth_days[row] > 1 &&
186+
indicators$var_weight[row] != "weight_unif") {
187+
188+
next
189+
}
190+
182191
# Copy only columns we're using.
183192
select_cols <- c(metric, var_weight, "weight_in_location")
184193
# This filter uses `x[, cols, with=FALSE]` rather than the newer

0 commit comments

Comments
 (0)