diff --git a/ansible/templates/backfillcorr-params-prod.json.j2 b/ansible/templates/backfillcorr-params-prod.json.j2 index b6c08fa59..d4ea9280d 100644 --- a/ansible/templates/backfillcorr-params-prod.json.j2 +++ b/ansible/templates/backfillcorr-params-prod.json.j2 @@ -4,12 +4,14 @@ "cache_dir": "./cache", "testing_window": 1, "training_days": 270, - "lag_pad":2, + "lag_pad": 2, "export_dir": "./receiving", "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", "denom_col": "den", + "parallel": true, + "parallel_max_cores": 2, "post": { "aws_credentials": { "aws_access_key_id": "{{ backfillcorr_aws_access_key_id }}", diff --git a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION index b42519175..a66c80353 100644 --- a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION +++ b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION @@ -27,7 +27,8 @@ Imports: parallel, purrr, vctrs, - RcppRoll + RcppRoll, + bettermc Suggests: knitr (>= 1.15), rmarkdown (>= 1.4), diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index c1bb46628..665c41e49 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -23,6 +23,7 @@ export(run_backfill) import(covidcast) importFrom(RcppRoll,roll_mean) importFrom(arrow,read_parquet) +importFrom(bettermc,mclapply) importFrom(dplyr,"%>%") importFrom(dplyr,across) importFrom(dplyr,arrange) @@ -31,9 +32,11 @@ importFrom(dplyr,bind_rows) importFrom(dplyr,desc) importFrom(dplyr,everything) importFrom(dplyr,filter) +importFrom(dplyr,full_join) importFrom(dplyr,group_by) importFrom(dplyr,group_split) importFrom(dplyr,if_else) +importFrom(dplyr,left_join) importFrom(dplyr,pull) importFrom(dplyr,select) importFrom(dplyr,starts_with) @@ -47,7 +50,10 @@ importFrom(lubridate,make_date) importFrom(lubridate,month) importFrom(lubridate,year) importFrom(parallel,detectCores) +importFrom(purrr,map) importFrom(purrr,map_dfc) +importFrom(purrr,map_dfr) +importFrom(purrr,reduce) importFrom(quantgen,quantile_lasso) importFrom(readr,write_csv) importFrom(stats,coef) diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index ae5779d03..b9b0fc892 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -11,6 +11,8 @@ #' #' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup #' @importFrom tidyr drop_na +#' @importFrom purrr map map_dfr +#' @importFrom bettermc mclapply #' #' @export run_backfill <- function(df, params, @@ -61,7 +63,12 @@ run_backfill <- function(df, params, group_dfs <- group_split(df, geo_value) # Build model for each location - for (subdf in group_dfs) { + apply_fn <- ifelse(params$parallel, mclapply, lapply) + result <- apply_fn(group_dfs, function(subdf) { + # Make a copy with the same structure. + state_test_data_list <- test_data_list + state_coef_list <- coef_list + geo <- subdf$geo_value[1] msg_ts("Processing ", geo, " geo group") @@ -163,7 +170,7 @@ run_backfill <- function(df, params, test_data <- filtered_data[[2]] if (nrow(train_data) == 0 || nrow(test_data) == 0) { - msg_ts("Not enough data to either train or test for test_lag ", + msg_ts("Not enough data to either train or test for test lag ", test_lag, ", skipping") next } @@ -180,7 +187,6 @@ run_backfill <- function(df, params, params_list <- c(YITL, as.vector(unlist(covariates))) # Model training and testing - msg_ts("Training or loading models") prediction_results <- model_training_and_testing( train_data, test_data, taus = params$taus, covariates = params_list, lp_solver = params$lp_solver, @@ -197,28 +203,32 @@ run_backfill <- function(df, params, # Model objects are saved during training, so only need to export # output if making predictions/corrections if (params$make_predictions) { - msg_ts("Generating predictions") test_data <- prediction_results[[1]] coefs <- prediction_results[[2]] test_data <- evaluate(test_data, params$taus) %>% exponentiate_preds(params$taus) key <- make_key(value_type, signal_suffix) - idx <- length(test_data_list[[key]]) + 1 - test_data_list[[key]][[idx]] <- test_data - coef_list[[key]][[idx]] <- coefs + idx <- length(state_test_data_list[[key]]) + 1 + state_test_data_list[[key]][[idx]] <- test_data + state_coef_list[[key]][[idx]] <- coefs } }# End for test lags }# End for value types }# End for signal suffixes - }# End for geo list + + return(list(coefs = state_coef_list, test_data = state_test_data_list)) + }) # End for geo list + + test_data_list <- map(result, ~.x$test_data) + coef_list <- map(result, ~.x$coefs) if (params$make_predictions) { for (value_type in params$value_types) { for (signal_suffix in signal_suffixes) { key <- make_key(value_type, signal_suffix) - test_combined <- bind_rows(test_data_list[[key]]) - coef_combined <- bind_rows(coef_list[[key]]) + test_combined <- map_dfr(test_data_list, ~.x[[key]]) + coef_combined <- map_dfr(coef_list, ~.x[[key]]) export_test_result(test_combined, coef_combined, indicator=indicator, signal=signal, signal_suffix=signal_suffix, diff --git a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R index 7536a4ad9..7404f69aa 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R +++ b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R @@ -187,6 +187,8 @@ add_weekofmonth <- function(df, time_col, wm = WEEK_ISSUES) { #' @template lag_col-template #' @template ref_lag-template #' +#' @importFrom dplyr full_join left_join +#' @importFrom purrr reduce #' @importFrom tidyr pivot_wider drop_na #' #' @export @@ -203,16 +205,21 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { names(avg_df)[names(avg_df) == value_col] <- 'value_7dav' avg_df_prev7 <- add_shift(avg_df, 7, refd_col) names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav' - - backfill_df <- Reduce(function(x, y) merge(x, y, all=TRUE), - list(df, avg_df, avg_df_prev7)) + + backfill_df <- reduce( + list(df, avg_df, avg_df_prev7), + full_join, by=c(refd_col, "issue_date") + ) # Add target target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")] names(target_df)[names(target_df) == value_col] <- 'value_target' names(target_df)[names(target_df) == 'issue_date'] <- 'target_date' - backfill_df <- merge(backfill_df, target_df, by=refd_col, all.x=TRUE) + backfill_df <- left_join(backfill_df, target_df, by=c(refd_col)) + + # Remove invalid rows + backfill_df <- drop_na(backfill_df, c(lag_col)) # Add log values backfill_df$log_value_raw = log(backfill_df$value_raw + 1) @@ -221,9 +228,6 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { backfill_df$log_value_prev_7dav = log(backfill_df$value_prev_7dav + 1) backfill_df$log_7dav_slope = backfill_df$log_value_7dav - backfill_df$log_value_prev_7dav - # Remove invalid rows - backfill_df <- drop_na(backfill_df, c(lag_col)) - return (as.data.frame(backfill_df)) } diff --git a/backfill_corrections/params.json.template b/backfill_corrections/params.json.template index 819ee1401..00bbfdf57 100644 --- a/backfill_corrections/params.json.template +++ b/backfill_corrections/params.json.template @@ -9,5 +9,7 @@ "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", - "denom_col": "den" + "denom_col": "den", + "parallel": false, + "parallel_max_cores": 2 }