Skip to content

Commit 1bb84d8

Browse files
committed
feat: adding patching with backfill
1 parent 5b9e69a commit 1bb84d8

File tree

5 files changed

+200
-17
lines changed

5 files changed

+200
-17
lines changed

claims_hosp/delphi_claims_hosp/backfill.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,18 @@
55
Created: 2022-08-03
66
77
"""
8-
import os
8+
99
import glob
10+
import os
11+
import re
12+
import shutil
1013
from datetime import datetime
14+
from typing import Union
1115

1216
# third party
1317
import pandas as pd
1418
from delphi_utils import GeoMapper
1519

16-
1720
from .config import Config
1821

1922
gmpr = GeoMapper()
@@ -69,9 +72,58 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
6972
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
7073
# Store intermediate file into the backfill folder
7174
backfilldata.to_parquet(path, index=False)
75+
return path
76+
77+
78+
def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger):
79+
"""
80+
Merge existing backfill with the patch data included.
81+
Parameters
82+
----------
83+
issue_date : datetime
84+
The most recent date when the raw data is received
85+
backfill_dir : str
86+
specified path to store backfill files.
87+
backfill_file : str
88+
89+
"""
90+
91+
new_files = glob.glob(backfill_dir + "/claims_hosp_*")
92+
93+
def get_file_with_date(files) -> Union[str, None]:
94+
for filename in files:
95+
pattern = re.findall(r"\d{8}", filename)
96+
if len(pattern) == 2:
97+
start_date = datetime.strptime(pattern[0], "%Y%m%d")
98+
end_date = datetime.strptime(pattern[1], "%Y%m%d")
99+
if start_date <= issue_date or end_date <= issue_date:
100+
return filename
101+
return ""
102+
103+
file_name = get_file_with_date(new_files)
104+
105+
if len(file_name) == 0:
106+
logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d"))
107+
return
108+
109+
# Start to merge files
110+
merge_file = f"{file_name.split('.')[0]}_after_merge.parquet"
111+
try:
112+
shutil.copyfile(file_name, merge_file)
113+
existing_df = pd.read_parquet(merge_file, engine="pyarrow")
114+
df = pd.read_parquet(backfill_file, engine="pyarrow")
115+
merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"])
116+
merged_df.to_parquet(merge_file, index=False)
117+
os.remove(file_name)
118+
os.rename(merge_file, file_name)
119+
# pylint: disable=W0703:
120+
except Exception as e:
121+
os.remove(merge_file)
122+
logger.error(e)
123+
return
124+
72125

73-
def merge_backfill_file(backfill_dir, backfill_merge_day, today,
74-
test_mode=False, check_nd=25):
126+
def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25):
75127
"""
76128
Merge ~4 weeks' backfill data into one file.
77129
@@ -80,7 +132,7 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today,
80132
threshold to allow flexibility in data delivery.
81133
Parameters
82134
----------
83-
today : datetime
135+
most_recent : datetime
84136
The most recent date when the raw data is received
85137
backfill_dir : str
86138
specified path to store backfill files.
@@ -109,7 +161,7 @@ def get_date(file_link):
109161

110162
# Check whether to merge
111163
# Check the number of files that are not merged
112-
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
164+
if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd:
113165
return
114166

115167
# Start to merge files

claims_hosp/delphi_claims_hosp/run.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,24 @@
55
when the module is run with `python -m delphi_claims_hosp`.
66
"""
77

8+
import os
9+
810
# standard packages
911
import time
10-
import os
1112
from datetime import datetime, timedelta
1213
from pathlib import Path
1314

1415
# third party
1516
from delphi_utils import get_structured_logger
1617

18+
from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file
19+
1720
# first party
1821
from .config import Config
1922
from .download_claims_ftp_files import download
20-
from .modify_claims_drops import modify_and_write
2123
from .get_latest_claims_name import get_latest_filename
24+
from .modify_claims_drops import modify_and_write
2225
from .update_indicator import ClaimsHospIndicatorUpdater
23-
from .backfill import (store_backfill_file, merge_backfill_file)
2426

2527

2628
def run_module(params, logger=None):
@@ -56,6 +58,10 @@ def run_module(params, logger=None):
5658
start_time = time.time()
5759
issue_date_str = params.get("patch", {}).get("current_issue", None)
5860
issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d")
61+
# safety check for patch parameters exists in file, but not running custom runs/patches
62+
custom_run_flag = (
63+
False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False)
64+
)
5965
if not logger:
6066
logger = get_structured_logger(
6167
__name__,
@@ -64,8 +70,7 @@ def run_module(params, logger=None):
6470
)
6571

6672
# pull latest data
67-
download(params["indicator"]["ftp_credentials"],
68-
params["indicator"]["input_dir"], logger, issue_date=issue_date)
73+
download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date)
6974

7075
# aggregate data
7176
modify_and_write(params["indicator"]["input_dir"], logger)
@@ -99,8 +104,13 @@ def run_module(params, logger=None):
99104
if params["indicator"].get("generate_backfill_files", True):
100105
backfill_dir = params["indicator"]["backfill_dir"]
101106
backfill_merge_day = params["indicator"]["backfill_merge_day"]
102-
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
103-
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
107+
if custom_run_flag:
108+
backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir)
109+
merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger)
110+
111+
else:
112+
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
113+
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
104114

105115
# print out information
106116
logger.info("Loaded params",

claims_hosp/tests/test_backfill.py

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,32 @@
1+
import logging
12
import os
23
import glob
34
from datetime import datetime
5+
from pathlib import Path
6+
import shutil
47

58
# third party
69
import pandas as pd
710
import pytest
811

912
# first party
1013
from delphi_claims_hosp.config import Config, GeoConstants
11-
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file
14+
from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files
1215

1316
CONFIG = Config()
1417
CONSTANTS = GeoConstants()
18+
TEST_PATH = Path(__file__).parent
1519
PARAMS = {
1620
"indicator": {
17-
"input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
18-
"backfill_dir": "./backfill",
21+
"input_file": f"{TEST_PATH}/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz",
22+
"backfill_dir": f"{TEST_PATH}/backfill",
1923
"drop_date": "2020-06-11",
2024
}
2125
}
2226
DATA_FILEPATH = PARAMS["indicator"]["input_file"]
2327
DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"])
2428
backfill_dir = PARAMS["indicator"]["backfill_dir"]
29+
TEST_LOGGER = logging.getLogger()
2530

2631
class TestBackfill:
2732

@@ -95,3 +100,67 @@ def test_merge_backfill_file(self):
95100

96101
os.remove(backfill_dir + "/" + fn)
97102
assert fn not in os.listdir(backfill_dir)
103+
104+
def test_merge_existing_backfill_files(self):
105+
issue_date = datetime(year=2020, month=6, day=13)
106+
issue_date_str = issue_date.strftime("%Y%m%d")
107+
def prep_backfill_data():
108+
# Generate backfill daily files
109+
for d in range(11, 15):
110+
dropdate = datetime(2020, 6, d)
111+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
112+
113+
today = datetime(2020, 6, 14)
114+
# creating expected file
115+
merge_backfill_file(backfill_dir, today.weekday(), today,
116+
test_mode=True, check_nd=2)
117+
original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet"
118+
os.rename(original, f"{backfill_dir}/expected.parquet")
119+
120+
# creating backfill without issue date
121+
os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet")
122+
today = datetime(2020, 6, 14)
123+
merge_backfill_file(backfill_dir, today.weekday(), today,
124+
test_mode=True, check_nd=2)
125+
126+
old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
127+
for file in old_files:
128+
os.remove(file)
129+
130+
prep_backfill_data()
131+
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
132+
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)
133+
134+
expected = pd.read_parquet(f"{backfill_dir}/expected.parquet")
135+
merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet")
136+
137+
check_diff = expected.merge(merged, how='left', indicator=True)
138+
assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0]
139+
for file in glob.glob(backfill_dir + "/*.parquet"):
140+
os.remove(file)
141+
142+
143+
def test_merge_existing_backfill_files_no_call(self):
144+
issue_date = datetime(year=2020, month=6, day=20)
145+
issue_date_str = issue_date.strftime("%Y%m%d")
146+
def prep_backfill_data():
147+
# Generate backfill daily files
148+
for d in range(11, 15):
149+
dropdate = datetime(2020, 6, d)
150+
store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir)
151+
152+
today = datetime(2020, 6, 14)
153+
# creating expected file
154+
merge_backfill_file(backfill_dir, today.weekday(), today,
155+
test_mode=True, check_nd=8)
156+
157+
prep_backfill_data()
158+
file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir)
159+
merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER)
160+
161+
old_files = glob.glob(backfill_dir + "*.parquet")
162+
for file in old_files:
163+
os.remove(file)
164+
165+
166+

claims_hosp/tests/test_download_claims_ftp_files.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,40 @@
11
# standard
22
import datetime
33
import re
4+
from mock import MagicMock, patch
5+
import logging
46

57
# third party
68
import numpy as np
9+
from freezegun import freeze_time
710

811
# first party
912
from delphi_claims_hosp.download_claims_ftp_files import (change_date_format,
10-
get_timestamp)
13+
get_timestamp, download)
1114

1215
OLD_FILENAME_TIMESTAMP = re.compile(
1316
r".*EDI_AGG_INPATIENT_[0-9]_(?P<ymd>[0-9]*)_(?P<hm>[0-9]*)[^0-9]*")
1417
NEW_FILENAME_TIMESTAMP = re.compile(r".*EDI_AGG_INPATIENT_(?P<ymd>[0-9]*)_(?P<hm>[0-9]*)[^0-9]*")
1518

19+
TEST_LOGGER = logging.getLogger()
1620
class TestDownloadClaimsFtpFiles:
21+
22+
@patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient')
23+
@patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False)
24+
def test_download(self, mock_exists, mock_sshclient):
25+
mock_sshclient_instance = MagicMock()
26+
mock_sshclient.return_value = mock_sshclient_instance
27+
mock_sftp = MagicMock()
28+
mock_sshclient_instance.open_sftp.return_value = mock_sftp
29+
mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")]
30+
ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"}
31+
out_path = "./test_data/"
32+
33+
issue_date = datetime.datetime(2020, 11, 7)
34+
download(ftp_credentials, out_path, TEST_LOGGER, issue_date=issue_date)
35+
mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"])
36+
mock_sftp.get.assert_called()
37+
1738

1839
def test_change_date_format(self):
1940
name = "SYNEDI_AGG_INPATIENT_20200611_1451CDT"

claims_hosp/tests/test_patch.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import unittest
2+
from unittest.mock import patch as mock_patch
3+
from delphi_claims_hosp.patch import patch
4+
import os
5+
import shutil
6+
7+
class TestPatchModule:
8+
def test_patch(self):
9+
with mock_patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \
10+
mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \
11+
mock_patch('delphi_claims_hosp.patch.run_module') as mock_run_module:
12+
13+
mock_read_params.return_value = {
14+
"common": {
15+
"log_filename": "test.log"
16+
},
17+
"patch": {
18+
"start_issue": "2021-01-01",
19+
"end_issue": "2021-01-02",
20+
"patch_dir": "./patch_dir"
21+
}
22+
}
23+
24+
patch()
25+
26+
assert os.path.isdir('./patch_dir')
27+
assert os.path.isdir('./patch_dir/issue_20210101/hospital-admissions')
28+
assert os.path.isdir('./patch_dir/issue_20210102/hospital-admissions')
29+
30+
# Clean up the created directories after the test
31+
shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"])

0 commit comments

Comments
 (0)