11# The COVID Hospitalization Production Forecasting Pipeline.
22source(" scripts/targets-common.R" )
33
4- submit_climatological <- FALSE
4+ submit_climatological <- TRUE
55submission_directory <- Sys.getenv(" COVID_SUBMISSION_DIRECTORY" , " cache" )
66insufficient_data_geos <- c(" as" , " mp" , " vi" , " gu" )
77# date to cut the truth data off at, so we don't have too much of the past
@@ -15,66 +15,112 @@ forecast_generation_date <- Sys.Date()
1515# Usually, the forecast_date is the same as the generation date, but you can
1616# override this. It should be a Wednesday.
1717forecast_date <- round_date(forecast_generation_date , " weeks" , week_start = 3 )
18- # If doing backfill, you can set the forecast_date to a sequence of dates.
19- # forecast_date <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L)
2018# forecast_generation_date needs to follow suit, but it's more complicated
2119# because sometimes we forecast on Thursday.
22- # forecast_generation_date <- c(as.Date(c("2024-11-21", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L))
20+ # forecast_generation_date <- c(as.Date(c("2024-11-20", "2024-11-27", "2024-12-04", "2024-12-11", "2024-12-18", "2024-12-26", "2025-01-02")), seq.Date(as.Date("2025-01-08"), Sys.Date(), by = 7L))
21+ # If doing backfill, you can set the forecast_date to a sequence of dates.
22+ # forecast_date <- seq.Date(as.Date("2024-11-20"), Sys.Date(), by = 7L)
2323
2424forecaster_fns <- list2(
25- linear = function (... ) {
26- forecaster_baseline_linear(... , residual_tail = 0.97 , residual_center = 0.097 , no_intercept = TRUE )
25+ linear = function (epi_data , ahead , extra_data , ... ) {
26+ forecaster_baseline_linear(epi_data , ahead , ... , residual_tail = 0.97 , residual_center = 0.097 , no_intercept = TRUE )
2727 },
28- # linearlog = function(...) {
28+ # linearlog = function(epi_data, ahead, extra_data, ...) {
2929 # forecaster_baseline_linear(..., log = TRUE)
3030 # },
31- climate_base = function (... ) {
31+ climate_base = function (epi_data , ahead , extra_data , ... ) {
3232 climatological_model(
33- ... ,
33+ epi_data , ahead , ... ,
3434 )
3535 },
36- climate_geo_agged = function (... ) {
36+ climate_geo_agged = function (epi_data , ahead , extra_data , ... ) {
3737 climatological_model(
38- ... ,
38+ epi_data , ahead , ... ,
3939 geo_agg = TRUE
4040 )
4141 },
42+ windowed_seasonal_extra_sources = function (epi_data , ahead , extra_data , ... ) {
43+ fcst <-
44+ epi_data %> %
45+ left_join(extra_data , by = join_by(geo_value , time_value )) %> %
46+ scaled_pop_seasonal(
47+ outcome = " value" ,
48+ ahead = ahead * 7 ,
49+ extra_sources = " nssp" ,
50+ ... ,
51+ seasonal_method = " window" ,
52+ trainer = epipredict :: quantile_reg(),
53+ drop_non_seasons = TRUE ,
54+ pop_scaling = FALSE ,
55+ lags = list (c(0 , 7 ), c(0 , 7 ))
56+ ) %> %
57+ mutate(target_end_date = target_end_date + 3 )
58+ fcst
59+ }
4260)
61+ indices <- seq_along(forecaster_fns )
4362
4463rlang :: list2(
4564 tar_target(aheads , command = - 1 : 3 ),
46- tar_target(forecasters , command = seq_along( forecaster_fns ) ),
65+ tar_target(forecasters , command = indices ),
4766 tar_target(
48- nhsn_latest_data ,
67+ download_latest ,
4968 command = {
5069 if (wday(Sys.Date()) < 6 & wday(Sys.Date()) > 3 ) {
5170 # download from the preliminary data source from Wednesday to Friday
5271 most_recent_result <- readr :: read_csv(" https://data.cdc.gov/resource/mpgq-jmmr.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm" )
5372 } else {
5473 most_recent_result <- readr :: read_csv(" https://data.cdc.gov/resource/ua7e-t2fy.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm" )
5574 }
56- most_recent_result %> %
75+ most_recent_result <-
76+ most_recent_result %> %
5777 process_nhsn_data() %> %
5878 filter(disease == " nhsn_covid" ) %> %
5979 select(- disease ) %> %
6080 filter(geo_value %nin % insufficient_data_geos )
81+ # if there's not already a result we need to save it no matter what
82+ if (file.exists(here :: here(" .nhsn_covid_cache.parquet" ))) {
83+ previous_result <- qs :: qread(here :: here(" .nhsn_covid_cache.parquet" ))
84+ } else
85+ # if something is different, update the file
86+ if (! isTRUE(all.equal(previous_result , most_recent_result ))) {
87+ qs :: qsave(most_recent_result , here :: here(" .nhsn_covid_cache.parquet" ))
88+ } else {
89+ qs :: qsave(most_recent_result , here :: here(" .nhsn_covid_cache.parquet" ))
90+ }
91+ NULL
6192 },
93+ description = " Download the result, and update the file only if it's actually different" ,
94+ priority = 1 ,
6295 cue = tar_cue(mode = " always" )
96+ ),
97+ tar_change(
98+ name = nhsn_latest_data ,
99+ command = {
100+ qs :: qread(here :: here(" .nhsn_flu_cache.parquet" ))
101+ },
102+ change = tools :: md5sum(here :: here(" .nhsn_flu_cache.parquet" ))
63103 ),
64104 tar_target(
65105 name = nhsn_archive_data ,
66106 command = {
67107 create_nhsn_data_archive(disease = " nhsn_covid" )
68108 }
69109 ),
110+ tar_target(
111+ current_nssp_archive ,
112+ command = {
113+ up_to_date_nssp_state_archive(" covid" )
114+ },
115+ cue = tar_cue(mode = " always" )
116+ ),
70117 tar_map(
71- values = tidyr :: expand_grid(
72- tibble(
118+ values = tibble(
73119 forecast_date_int = forecast_date ,
74- forecast_generation_date_int = forecast_generation_date
75- )
76- ),
77- names = " forecast_date " ,
120+ forecast_generation_date_int = forecast_generation_date ,
121+ forecast_date_chr = as.character( forecast_date_int )
122+ ),
123+ names = " forecast_date_chr " ,
78124 tar_target(
79125 name = geo_forecasters_weights ,
80126 command = {
@@ -84,7 +130,6 @@ rlang::list2(
84130 }
85131 geo_forecasters_weights
86132 },
87- cue = tar_cue(mode = " always" )
88133 ),
89134 tar_target(
90135 name = geo_exclusions ,
@@ -97,41 +142,46 @@ rlang::list2(
97142 command = {
98143 if (as.Date(forecast_generation_date_int ) < Sys.Date()) {
99144 train_data <- nhsn_archive_data %> %
100- epix_as_of(as.Date(forecast_generation_date_int )) %> %
145+ epix_as_of(min(forecast_date , current_nssp_archive $ versions_end )) %> %
146+ add_season_info() %> %
101147 mutate(
102148 geo_value = ifelse(geo_value == " usa" , " us" , geo_value ),
103149 time_value = time_value - 3
104- ) %> %
105- add_season_info()
150+ )
106151 } else {
107152 train_data <-
108153 nhsn_latest_data %> %
109154 data_substitutions(disease = " covid" ) %> %
110155 as_epi_df(as_of = as.Date(forecast_date_int ))
111156 }
157+ nssp <- current_nssp_archive %> %
158+ epix_as_of(min(forecast_date , current_nssp_archive $ versions_end )) %> %
159+ mutate(time_value = time_value )
112160 attributes(train_data )$ metadata $ as_of <- as.Date(forecast_date_int )
161+ print(names(forecaster_fns [forecasters ]))
113162 train_data %> %
114- forecaster_fns [[forecasters ]](ahead = aheads ) %> %
163+ forecaster_fns [[forecasters ]](ahead = aheads , extra_data = nssp ) %> %
115164 mutate(
116165 forecaster = names(forecaster_fns [forecasters ]),
117166 geo_value = as.factor(geo_value )
118167 )
119168 },
120- pattern = cross(aheads , forecasters ),
121- cue = tar_cue(mode = " always" )
169+ pattern = cross(aheads , forecasters )
122170 ),
123171 tar_target(
124172 name = ensemble_res ,
125173 command = {
126174 forecast_res %> %
127- mutate(quantile = round(quantile , digits = 3 )) %> %
128- left_join(geo_forecasters_weights , by = join_by(forecast_date , forecaster , geo_value )) %> %
129- mutate(value = value * weight ) %> %
130- group_by(forecast_date , geo_value , target_end_date , quantile ) %> %
131- summarize(value = sum(value , na.rm = TRUE ), .groups = " drop" ) %> %
132- filter(geo_value %nin % geo_exclusions )
175+ ensemble_linear_climate(
176+ aheads ,
177+ other_weights = geo_forecasters_weights ,
178+ max_climate_ahead_weight = 0.6 ,
179+ max_climate_quantile_weight = 0.6
180+ ) %> %
181+ filter(geo_value %nin % geo_exclusions ) %> %
182+ ungroup() %> %
183+ sort_by_quantile()
133184 },
134- cue = tar_cue(mode = " always" )
135185 ),
136186 tar_target(
137187 name = ensemble_mixture_res ,
@@ -144,7 +194,13 @@ rlang::list2(
144194 max_climate_quantile_weight = 0.6
145195 ) %> %
146196 filter(geo_value %nin % geo_exclusions ) %> %
147- ungroup()
197+ ungroup() %> %
198+ bind_rows(forecast_res %> %
199+ filter(forecaster == " windowed_seasonal_extra_sources" ) %> %
200+ filter(forecast_date < target_end_date )) %> % # don't use for neg aheads
201+ group_by(geo_value , forecast_date , target_end_date , quantile ) %> %
202+ summarize(value = mean(value , na.rm = TRUE ), .groups = " drop" ) %> %
203+ sort_by_quantile()
148204 },
149205 ),
150206 tar_target(
@@ -168,7 +224,6 @@ rlang::list2(
168224 format_flusight(disease = " covid" ) %> %
169225 write_submission_file(forecast_reference_date , file.path(submission_directory , " model-output/CMU-TimeSeries" ))
170226 },
171- cue = tar_cue(mode = " always" )
172227 ),
173228 tar_target(
174229 name = make_climate_submission_csv ,
@@ -181,14 +236,14 @@ rlang::list2(
181236 summarize(forecast_date = first(forecast_date ), value = mean(value , na.rm = TRUE ), .groups = " drop" ) %> %
182237 ungroup() %> %
183238 format_flusight(disease = " covid" ) %> %
239+ filter(location %nin % c(" 60" , " 66" , " 78" )) %> %
184240 write_submission_file(
185241 get_forecast_reference_date(forecast_date_int ),
186- submission_directory = file.path(submission_directory , " model-output/CMU-climatological-baseline " ),
187- file_name = " CMU-climatological-baseline "
242+ submission_directory = file.path(submission_directory , " model-output/CMU-climate_baseline " ),
243+ file_name = " CMU-climate_baseline "
188244 )
189245 }
190246 },
191- cue = tar_cue(mode = " always" )
192247 ),
193248 tar_target(
194249 name = validate_result ,
@@ -205,7 +260,6 @@ rlang::list2(
205260 }
206261 validation
207262 },
208- cue = tar_cue(mode = " always" )
209263 ),
210264 tar_target(
211265 name = validate_climate_result ,
@@ -215,14 +269,13 @@ rlang::list2(
215269 if (submission_directory != " cache" && submit_climatological ) {
216270 validation <- validate_submission(
217271 submission_directory ,
218- file_path = sprintf(" CMU-climatological-baseline /%s-CMU-climatological-baseline .csv" , get_forecast_reference_date(forecast_date_int ))
272+ file_path = sprintf(" CMU-climate_baseline /%s-CMU-climate_baseline .csv" , get_forecast_reference_date(forecast_date_int ))
219273 )
220274 } else {
221275 validation <- " not validating when there is no hub (set submission_directory)"
222276 }
223277 validation
224278 },
225- cue = tar_cue(mode = " always" )
226279 ),
227280 tar_target(
228281 name = truth_data ,
@@ -261,7 +314,6 @@ rlang::list2(
261314 select(- rel_max_value )
262315 truth_data %> % bind_rows(nssp_renormalized )
263316 },
264- cue = tar_cue(mode = " always" )
265317 ),
266318 tar_target(
267319 notebook ,
@@ -271,7 +323,7 @@ rlang::list2(
271323 " scripts/reports/forecast_report.Rmd" ,
272324 output_file = here :: here(
273325 " reports" ,
274- sprintf(" %s_covid_prod_on_%s.html" , as.Date(forecast_date_int ), as.Date(forecast_generation_date_int ))
326+ sprintf(" %s_covid_prod_on_%s.html" , as.Date(forecast_date_int ), as.Date(Sys.Date() ))
275327 ),
276328 params = list (
277329 disease = " covid" ,
0 commit comments