Skip to content

Commit a193138

Browse files
committed
Flu signal
1 parent c1c954e commit a193138

File tree

5 files changed

+80
-5
lines changed

5 files changed

+80
-5
lines changed

changehc/delphi_changehc/constants.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
SMOOTHED_ADJ = "smoothed_adj_outpatient_covid"
44
SMOOTHED_CLI = "smoothed_outpatient_cli"
55
SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli"
6-
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI]
6+
SMOOTHED_FLU = "smoothed_outpatient_flu"
7+
SMOOTHED_ADJ_FLU = "smoothed_adj_outpatient_flu"
8+
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, SMOOTHED_FLU, SMOOTHED_ADJ_FLU]
79
NA = "NA"
810
HRR = "hrr"
911
FIPS = "fips"

changehc/delphi_changehc/download_ftp_files.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,30 @@ def download_cli(filedate, out_path, ftp_conn):
9393
finally:
9494
if client:
9595
client.close()
96+
97+
98+
def download_flu(filedate, out_path, ftp_conn):
99+
"""Download files necessary to create chng-flu signal from ftp server.
100+
101+
Args:
102+
filedate: YYYYmmdd string for which the files are named
103+
out_path: Path to local directory into which to download the files
104+
ftp_conn: Dict containing login credentials to ftp server
105+
"""
106+
# open client
107+
try:
108+
client = paramiko.SSHClient()
109+
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
110+
111+
client.connect(ftp_conn["host"], username=ftp_conn["user"],
112+
password=ftp_conn["pass"],
113+
port=ftp_conn["port"],
114+
allow_agent=False, look_for_keys=False)
115+
sftp = client.open_sftp()
116+
117+
sftp.chdir('/countproducts')
118+
get_files_from_dir(sftp, filedate, out_path)
119+
120+
finally:
121+
if client:
122+
client.close()

changehc/delphi_changehc/load_data.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,3 +153,36 @@ def load_cli_data(denom_filepath, flu_filepath, mixed_filepath, flu_like_filepat
153153
data = data[["num", "den"]]
154154

155155
return data
156+
157+
158+
def load_flu_data(denom_filepath, flu_filepath, dropdate, base_geo):
159+
"""Load in denominator and flu data, and combine them.
160+
161+
Args:
162+
denom_filepath: path to the aggregated denominator data
163+
flu_filepath: path to the aggregated flu data
164+
dropdate: data drop date (datetime object)
165+
base_geo: base geographic unit before aggregation ('fips')
166+
167+
Returns:
168+
combined multiindexed dataframe, index 0 is geo_base, index 1 is date
169+
"""
170+
assert base_geo == "fips", "base unit must be 'fips'"
171+
172+
# load each data stream
173+
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
174+
Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL)
175+
flu_data = load_chng_data(flu_filepath, dropdate, base_geo,
176+
Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL)
177+
178+
# merge data
179+
data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True)
180+
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"
181+
182+
# calculate combined numerator and denominator
183+
data.fillna(0, inplace=True)
184+
data["num"] = data[Config.FLU_COL]
185+
data["den"] = data[Config.DENOM_COL]
186+
data = data[["num", "den"]]
187+
188+
return data

changehc/delphi_changehc/run.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from delphi_utils import get_structured_logger
1515

1616
# first party
17-
from .download_ftp_files import download_covid, download_cli
18-
from .load_data import load_combined_data, load_cli_data
17+
from .download_ftp_files import download_covid, download_cli, download_flu
18+
from .load_data import load_combined_data, load_cli_data, load_flu_data
1919
from .update_sensor import CHCSensorUpdater
2020

2121

@@ -30,6 +30,8 @@ def retrieve_files(params, filedate, logger):
3030
download_covid(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
3131
if "cli" in params["indicator"]["types"]:
3232
download_cli(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
33+
if "flu" in params["indicator"]["types"]:
34+
download_flu(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"])
3335

3436
denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
3537
covid_file = "%s/%s_Counts_Products_Covid.dat.gz" % (params["indicator"]["input_cache_dir"],filedate)
@@ -53,6 +55,8 @@ def retrieve_files(params, filedate, logger):
5355
file_dict["mixed"] = mixed_file
5456
file_dict["flu_like"] = flu_like_file
5557
file_dict["covid_like"] = covid_like_file
58+
if "flu" in params["indicator"]["types"]:
59+
file_dict["flu"] = flu_file
5660
return file_dict
5761

5862

@@ -75,6 +79,9 @@ def make_asserts(params):
7579
files["flu_like"] is not None and \
7680
files["covid_like"] is not None,\
7781
"files must be all present or all absent"
82+
if "flu" in params["indicator"]["types"]:
83+
assert (files["denom"] is None) == (files["flu"] is None), \
84+
"exactly one of denom and flu files are provided"
7885

7986

8087
def run_module(params: Dict[str, Dict[str, Any]]):
@@ -182,6 +189,9 @@ def run_module(params: Dict[str, Dict[str, Any]]):
182189
elif numtype == "cli":
183190
data = load_cli_data(file_dict["denom"],file_dict["flu"],file_dict["mixed"],
184191
file_dict["flu_like"],file_dict["covid_like"],dropdate_dt,"fips")
192+
elif numtype == "flu":
193+
data = load_flu_data(file_dict["denom"],file_dict["flu"],
194+
dropdate_dt,"fips")
185195
more_stats = su_inst.update_sensor(
186196
data,
187197
params["common"]["export_dir"],

changehc/delphi_changehc/update_sensor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515

1616
# first party
1717
from .config import Config
18-
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
18+
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI,\
19+
SMOOTHED_FLU, SMOOTHED_ADJ_FLU, NA
1920
from .sensor import CHCSensor
2021

2122

@@ -114,9 +115,11 @@ def __init__(self,
114115
signal_name = SMOOTHED_ADJ if self.weekday else SMOOTHED
115116
elif self.numtype == "cli":
116117
signal_name = SMOOTHED_ADJ_CLI if self.weekday else SMOOTHED_CLI
118+
elif self.numtype == "flu":
119+
signal_name = SMOOTHED_ADJ_FLU if self.weekday else SMOOTHED_FLU
117120
else:
118121
raise ValueError(f'Unsupported numtype received "{numtype}",'
119-
f' must be one of ["covid", "cli"]')
122+
f' must be one of ["covid", "cli", "flu"]')
120123
self.signal_name = add_prefix([signal_name], wip_signal=wip_signal)[0]
121124

122125
# initialize members set in shift_dates().

0 commit comments

Comments
 (0)