|
9 | 9 | import pytest |
10 | 10 |
|
11 | 11 | # first party |
| 12 | +from delphi_utils.logger import get_structured_logger |
12 | 13 | from delphi_claims_hosp.config import Config, GeoConstants |
13 | 14 | from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files |
14 | 15 |
|
|
25 | 26 | DATA_FILEPATH = PARAMS["indicator"]["input_file"] |
26 | 27 | DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) |
27 | 28 | backfill_dir = PARAMS["indicator"]["backfill_dir"] |
28 | | -TEST_LOGGER = logging.getLogger() |
29 | 29 |
|
30 | 30 | class TestBackfill: |
31 | 31 |
|
32 | | - def test_store_backfill_file(self): |
33 | | - dropdate = datetime(2020, 1, 1) |
| 32 | + def cleanup(self): |
| 33 | + for file in glob.glob(f"{backfill_dir}/*.parquet"): |
| 34 | + os.remove(file) |
| 35 | + |
| 36 | + def test_store_backfill_file(self, caplog): |
| 37 | + dropdate = datetime(2020, 1, 1) |
34 | 38 | fn = "claims_hosp_as_of_20200101.parquet" |
35 | | - assert fn not in os.listdir(backfill_dir) |
36 | | - |
| 39 | + caplog.set_level(logging.INFO) |
| 40 | + logger = get_structured_logger() |
| 41 | + num_rows = len(pd.read_csv(DATA_FILEPATH)) |
| 42 | + |
37 | 43 | # Store backfill file |
38 | | - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) |
| 44 | + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) |
39 | 45 | assert fn in os.listdir(backfill_dir) |
| 46 | + assert "Stored backfill data in parquet" in caplog.text |
| 47 | + |
| 48 | + |
40 | 49 | fn = "claims_hosp_as_of_20200101.parquet" |
41 | 50 | backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') |
42 | | - |
| 51 | + |
43 | 52 | selected_columns = ['time_value', 'fips', 'state_id', |
44 | 53 | 'num', 'den', 'lag', 'issue_date'] |
45 | | - assert set(selected_columns) == set(backfill_df.columns) |
46 | | - |
47 | | - os.remove(backfill_dir + "/" + fn) |
48 | | - assert fn not in os.listdir(backfill_dir) |
| 54 | + |
| 55 | + assert set(selected_columns) == set(backfill_df.columns) |
| 56 | + assert num_rows == len(backfill_df) |
| 57 | + |
| 58 | + self.cleanup() |
49 | 59 |
|
50 | | - def test_merge_backfill_file(self): |
| 60 | + def test_merge_backfill_file(self, caplog): |
51 | 61 | fn = "claims_hosp_202006.parquet" |
52 | | - assert fn not in os.listdir(backfill_dir) |
53 | | - |
| 62 | + caplog.set_level(logging.INFO) |
| 63 | + logger = get_structured_logger() |
| 64 | + |
54 | 65 | # Check when there is no daily file to merge. |
55 | 66 | today = datetime(2020, 6, 14) |
56 | | - merge_backfill_file(backfill_dir, today, TEST_LOGGER, |
| 67 | + merge_backfill_file(backfill_dir, today, logger, |
57 | 68 | test_mode=True) |
58 | 69 | assert fn not in os.listdir(backfill_dir) |
59 | | - |
60 | | - # Generate backfill daily files |
| 70 | + assert "No new files to merge; skipping merging" in caplog.text |
| 71 | + |
| 72 | + |
| 73 | + # Generate backfill daily files |
61 | 74 | for d in range(11, 15): |
62 | | - dropdate = datetime(2020, 6, d) |
63 | | - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) |
64 | | - |
65 | | - # Check when the merged file is not generated |
| 75 | + dropdate = datetime(2020, 6, d) |
| 76 | + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) |
| 77 | + |
66 | 78 | today = datetime(2020, 7, 1) |
67 | | - merge_backfill_file(backfill_dir, today, TEST_LOGGER, |
| 79 | + merge_backfill_file(backfill_dir, today, logger, |
68 | 80 | test_mode=True) |
| 81 | + assert "Merging files" in caplog.text |
69 | 82 | assert fn in os.listdir(backfill_dir) |
70 | 83 |
|
71 | 84 | # Read daily file |
72 | 85 | new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") |
73 | | - pdList = [] |
| 86 | + pdList = [] |
74 | 87 | for file in new_files: |
75 | | - if "from" in file: |
76 | | - continue |
77 | 88 | df = pd.read_parquet(file, engine='pyarrow') |
78 | 89 | pdList.append(df) |
79 | 90 | os.remove(file) |
80 | 91 | new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") |
81 | 92 | assert len(new_files) == 1 |
82 | 93 |
|
83 | 94 | expected = pd.concat(pdList).sort_values(["time_value", "fips"]) |
84 | | - |
| 95 | + |
85 | 96 | # Read the merged file |
86 | 97 | merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') |
87 | | - |
| 98 | + |
88 | 99 | assert set(expected.columns) == set(merged.columns) |
89 | 100 | assert expected.shape[0] == merged.shape[0] |
90 | 101 | assert expected.shape[1] == merged.shape[1] |
91 | | - |
92 | | - os.remove(backfill_dir + "/" + fn) |
93 | | - assert fn not in os.listdir(backfill_dir) |
94 | 102 |
|
95 | | - def test_merge_existing_backfill_files(self): |
| 103 | + self.cleanup() |
| 104 | + |
| 105 | + def test_merge_existing_backfill_files(self, caplog): |
96 | 106 | issue_date = datetime(year=2020, month=6, day=13) |
97 | 107 | issue_date_str = issue_date.strftime("%Y%m%d") |
| 108 | + caplog.set_level(logging.INFO) |
| 109 | + logger = get_structured_logger() |
98 | 110 | def prep_backfill_data(): |
99 | 111 | # Generate backfill daily files |
100 | 112 | for d in range(11, 15): |
101 | 113 | dropdate = datetime(2020, 6, d) |
102 | | - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) |
| 114 | + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) |
103 | 115 |
|
104 | | - today = datetime(2020, 6, 14) |
| 116 | + today = datetime(2020, 7, 1) |
105 | 117 | # creating expected file |
106 | | - merge_backfill_file(backfill_dir, today, TEST_LOGGER, |
| 118 | + merge_backfill_file(backfill_dir, today, logger, |
107 | 119 | test_mode=True) |
108 | | - original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" |
| 120 | + original = f"{backfill_dir}/claims_hosp_202006.parquet" |
109 | 121 | os.rename(original, f"{backfill_dir}/expected.parquet") |
110 | 122 |
|
111 | 123 | # creating backfill without issue date |
112 | 124 | os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") |
113 | | - today = datetime(2020, 6, 14) |
114 | | - merge_backfill_file(backfill_dir, today, |
115 | | - test_mode=True, check_nd=2) |
| 125 | + merge_backfill_file(backfill_dir, today, logger, |
| 126 | + test_mode=True) |
116 | 127 |
|
117 | 128 | old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") |
118 | 129 | for file in old_files: |
119 | 130 | os.remove(file) |
120 | 131 |
|
121 | 132 | prep_backfill_data() |
122 | | - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) |
123 | | - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) |
| 133 | + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) |
| 134 | + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) |
| 135 | + |
| 136 | + assert "Adding missing date to merged file" in caplog.text |
124 | 137 |
|
125 | 138 | expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") |
126 | | - merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") |
| 139 | + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") |
127 | 140 |
|
128 | | - check_diff = expected.merge(merged, how='left', indicator=True) |
129 | | - assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0] |
130 | | - for file in glob.glob(backfill_dir + "/*.parquet"): |
131 | | - os.remove(file) |
| 141 | + check = pd.concat([merged, expected]).drop_duplicates(keep=False) |
132 | 142 |
|
| 143 | + assert len(check) == 0 |
133 | 144 |
|
134 | | - def test_merge_existing_backfill_files_no_call(self): |
| 145 | + self.cleanup() |
| 146 | + |
| 147 | + |
| 148 | + def test_merge_existing_backfill_files_no_call(self, caplog): |
135 | 149 | issue_date = datetime(year=2020, month=5, day=20) |
136 | | - issue_date_str = issue_date.strftime("%Y%m%d") |
| 150 | + caplog.set_level(logging.INFO) |
| 151 | + logger = get_structured_logger() |
137 | 152 | def prep_backfill_data(): |
138 | 153 | # Generate backfill daily files |
139 | 154 | for d in range(11, 15): |
140 | 155 | dropdate = datetime(2020, 6, d) |
141 | | - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) |
| 156 | + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) |
142 | 157 |
|
143 | 158 | today = datetime(2020, 6, 14) |
144 | 159 | # creating expected file |
145 | | - merge_backfill_file(backfill_dir, today, TEST_LOGGER, |
| 160 | + merge_backfill_file(backfill_dir, today, logger, |
146 | 161 | test_mode=True) |
147 | 162 |
|
148 | 163 | prep_backfill_data() |
149 | | - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) |
150 | | - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) |
| 164 | + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) |
| 165 | + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) |
| 166 | + assert "Issue date has no matching merged files" in caplog.text |
151 | 167 |
|
152 | | - old_files = glob.glob(backfill_dir + "*.parquet") |
153 | | - for file in old_files: |
154 | | - os.remove(file) |
| 168 | + self.cleanup() |
155 | 169 |
|
156 | 170 |
|
157 | 171 |
|
0 commit comments