Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ansible/templates/backfillcorr-params-prod.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
"cache_dir": "./cache",
"testing_window": 1,
"training_days": 270,
"lag_pad":2,
"lag_pad": 2,
"export_dir": "./receiving",
"geo_levels": ["state", "county"],
"value_types": ["count", "fraction"],
"num_col": "num",
"denom_col": "den",
"parallel": true,
"parallel_max_cores": 2,
"post": {
"aws_credentials": {
"aws_access_key_id": "{{ backfillcorr_aws_access_key_id }}",
Expand Down
3 changes: 2 additions & 1 deletion backfill_corrections/delphiBackfillCorrection/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ Imports:
parallel,
purrr,
vctrs,
RcppRoll
RcppRoll,
bettermc
Suggests:
knitr (>= 1.15),
rmarkdown (>= 1.4),
Expand Down
6 changes: 6 additions & 0 deletions backfill_corrections/delphiBackfillCorrection/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export(run_backfill)
import(covidcast)
importFrom(RcppRoll,roll_mean)
importFrom(arrow,read_parquet)
importFrom(bettermc,mclapply)
importFrom(dplyr,"%>%")
importFrom(dplyr,across)
importFrom(dplyr,arrange)
Expand All @@ -31,9 +32,11 @@ importFrom(dplyr,bind_rows)
importFrom(dplyr,desc)
importFrom(dplyr,everything)
importFrom(dplyr,filter)
importFrom(dplyr,full_join)
importFrom(dplyr,group_by)
importFrom(dplyr,group_split)
importFrom(dplyr,if_else)
importFrom(dplyr,left_join)
importFrom(dplyr,pull)
importFrom(dplyr,select)
importFrom(dplyr,starts_with)
Expand All @@ -47,7 +50,10 @@ importFrom(lubridate,make_date)
importFrom(lubridate,month)
importFrom(lubridate,year)
importFrom(parallel,detectCores)
importFrom(purrr,map)
importFrom(purrr,map_dfc)
importFrom(purrr,map_dfr)
importFrom(purrr,reduce)
importFrom(quantgen,quantile_lasso)
importFrom(readr,write_csv)
importFrom(stats,coef)
Expand Down
30 changes: 20 additions & 10 deletions backfill_corrections/delphiBackfillCorrection/R/main.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#'
#' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup
#' @importFrom tidyr drop_na
#' @importFrom purrr map map_dfr
#' @importFrom bettermc mclapply
#'
#' @export
run_backfill <- function(df, params,
Expand Down Expand Up @@ -61,7 +63,12 @@ run_backfill <- function(df, params,
group_dfs <- group_split(df, geo_value)

# Build model for each location
for (subdf in group_dfs) {
apply_fn <- ifelse(params$parallel, mclapply, lapply)
result <- apply_fn(group_dfs, function(subdf) {
# Make a copy with the same structure.
state_test_data_list <- test_data_list
state_coef_list <- coef_list

geo <- subdf$geo_value[1]

msg_ts("Processing ", geo, " geo group")
Expand Down Expand Up @@ -163,7 +170,7 @@ run_backfill <- function(df, params,
test_data <- filtered_data[[2]]

if (nrow(train_data) == 0 || nrow(test_data) == 0) {
msg_ts("Not enough data to either train or test for test_lag ",
msg_ts("Not enough data to either train or test for test lag ",
test_lag, ", skipping")
next
}
Expand All @@ -180,7 +187,6 @@ run_backfill <- function(df, params,
params_list <- c(YITL, as.vector(unlist(covariates)))

# Model training and testing
msg_ts("Training or loading models")
prediction_results <- model_training_and_testing(
train_data, test_data, taus = params$taus, covariates = params_list,
lp_solver = params$lp_solver,
Expand All @@ -197,28 +203,32 @@ run_backfill <- function(df, params,
# Model objects are saved during training, so only need to export
# output if making predictions/corrections
if (params$make_predictions) {
msg_ts("Generating predictions")
test_data <- prediction_results[[1]]
coefs <- prediction_results[[2]]
test_data <- evaluate(test_data, params$taus) %>%
exponentiate_preds(params$taus)

key <- make_key(value_type, signal_suffix)
idx <- length(test_data_list[[key]]) + 1
test_data_list[[key]][[idx]] <- test_data
coef_list[[key]][[idx]] <- coefs
idx <- length(state_test_data_list[[key]]) + 1
state_test_data_list[[key]][[idx]] <- test_data
state_coef_list[[key]][[idx]] <- coefs
}
}# End for test lags
}# End for value types
}# End for signal suffixes
}# End for geo list

return(list(coefs = state_coef_list, test_data = state_test_data_list))
}) # End for geo list

test_data_list <- map(result, ~.x$test_data)
coef_list <- map(result, ~.x$coefs)

if (params$make_predictions) {
for (value_type in params$value_types) {
for (signal_suffix in signal_suffixes) {
key <- make_key(value_type, signal_suffix)
test_combined <- bind_rows(test_data_list[[key]])
coef_combined <- bind_rows(coef_list[[key]])
test_combined <- map_dfr(test_data_list, ~.x[[key]])
coef_combined <- map_dfr(coef_list, ~.x[[key]])
export_test_result(test_combined, coef_combined,
indicator=indicator, signal=signal,
signal_suffix=signal_suffix,
Expand Down
18 changes: 11 additions & 7 deletions backfill_corrections/delphiBackfillCorrection/R/preprocessing.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ add_weekofmonth <- function(df, time_col, wm = WEEK_ISSUES) {
#' @template lag_col-template
#' @template ref_lag-template
#'
#' @importFrom dplyr full_join left_join
#' @importFrom purrr reduce
#' @importFrom tidyr pivot_wider drop_na
#'
#' @export
Expand All @@ -203,16 +205,21 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) {
names(avg_df)[names(avg_df) == value_col] <- 'value_7dav'
avg_df_prev7 <- add_shift(avg_df, 7, refd_col)
names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav'

backfill_df <- Reduce(function(x, y) merge(x, y, all=TRUE),
list(df, avg_df, avg_df_prev7))

backfill_df <- reduce(
list(df, avg_df, avg_df_prev7),
full_join, by=c(refd_col, "issue_date")
)

# Add target
target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")]
names(target_df)[names(target_df) == value_col] <- 'value_target'
names(target_df)[names(target_df) == 'issue_date'] <- 'target_date'

backfill_df <- merge(backfill_df, target_df, by=refd_col, all.x=TRUE)
backfill_df <- left_join(backfill_df, target_df, by=c(refd_col))

# Remove invalid rows
backfill_df <- drop_na(backfill_df, c(lag_col))

# Add log values
backfill_df$log_value_raw = log(backfill_df$value_raw + 1)
Expand All @@ -221,9 +228,6 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) {
backfill_df$log_value_prev_7dav = log(backfill_df$value_prev_7dav + 1)
backfill_df$log_7dav_slope = backfill_df$log_value_7dav - backfill_df$log_value_prev_7dav

# Remove invalid rows
backfill_df <- drop_na(backfill_df, c(lag_col))

return (as.data.frame(backfill_df))
}

Expand Down
4 changes: 3 additions & 1 deletion backfill_corrections/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@
"geo_levels": ["state", "county"],
"value_types": ["count", "fraction"],
"num_col": "num",
"denom_col": "den"
"denom_col": "den",
"parallel": false,
"parallel_max_cores": 2
}