diff --git a/ansible/templates/backfillcorr-params-prod.json.j2 b/ansible/templates/backfillcorr-params-prod.json.j2 index b6c08fa59..d4ea9280d 100644 --- a/ansible/templates/backfillcorr-params-prod.json.j2 +++ b/ansible/templates/backfillcorr-params-prod.json.j2 @@ -4,12 +4,14 @@ "cache_dir": "./cache", "testing_window": 1, "training_days": 270, - "lag_pad":2, + "lag_pad": 2, "export_dir": "./receiving", "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", "denom_col": "den", + "parallel": true, + "parallel_max_cores": 2, "post": { "aws_credentials": { "aws_access_key_id": "{{ backfillcorr_aws_access_key_id }}", diff --git a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION index b42519175..a66c80353 100644 --- a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION +++ b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION @@ -27,7 +27,8 @@ Imports: parallel, purrr, vctrs, - RcppRoll + RcppRoll, + bettermc Suggests: knitr (>= 1.15), rmarkdown (>= 1.4), diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index 787882daf..665c41e49 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -23,6 +23,7 @@ export(run_backfill) import(covidcast) importFrom(RcppRoll,roll_mean) importFrom(arrow,read_parquet) +importFrom(bettermc,mclapply) importFrom(dplyr,"%>%") importFrom(dplyr,across) importFrom(dplyr,arrange) @@ -49,7 +50,9 @@ importFrom(lubridate,make_date) importFrom(lubridate,month) importFrom(lubridate,year) importFrom(parallel,detectCores) +importFrom(purrr,map) importFrom(purrr,map_dfc) +importFrom(purrr,map_dfr) importFrom(purrr,reduce) importFrom(quantgen,quantile_lasso) importFrom(readr,write_csv) diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index ae5779d03..b9b0fc892 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -11,6 +11,8 @@ #' #' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup #' @importFrom tidyr drop_na +#' @importFrom purrr map map_dfr +#' @importFrom bettermc mclapply #' #' @export run_backfill <- function(df, params, @@ -61,7 +63,12 @@ run_backfill <- function(df, params, group_dfs <- group_split(df, geo_value) # Build model for each location - for (subdf in group_dfs) { + apply_fn <- ifelse(params$parallel, mclapply, lapply) + result <- apply_fn(group_dfs, function(subdf) { + # Make a copy with the same structure. + state_test_data_list <- test_data_list + state_coef_list <- coef_list + geo <- subdf$geo_value[1] msg_ts("Processing ", geo, " geo group") @@ -163,7 +170,7 @@ run_backfill <- function(df, params, test_data <- filtered_data[[2]] if (nrow(train_data) == 0 || nrow(test_data) == 0) { - msg_ts("Not enough data to either train or test for test_lag ", + msg_ts("Not enough data to either train or test for test lag ", test_lag, ", skipping") next } @@ -180,7 +187,6 @@ run_backfill <- function(df, params, params_list <- c(YITL, as.vector(unlist(covariates))) # Model training and testing - msg_ts("Training or loading models") prediction_results <- model_training_and_testing( train_data, test_data, taus = params$taus, covariates = params_list, lp_solver = params$lp_solver, @@ -197,28 +203,32 @@ run_backfill <- function(df, params, # Model objects are saved during training, so only need to export # output if making predictions/corrections if (params$make_predictions) { - msg_ts("Generating predictions") test_data <- prediction_results[[1]] coefs <- prediction_results[[2]] test_data <- evaluate(test_data, params$taus) %>% exponentiate_preds(params$taus) key <- make_key(value_type, signal_suffix) - idx <- length(test_data_list[[key]]) + 1 - test_data_list[[key]][[idx]] <- test_data - coef_list[[key]][[idx]] <- coefs + idx <- length(state_test_data_list[[key]]) + 1 + state_test_data_list[[key]][[idx]] <- test_data + state_coef_list[[key]][[idx]] <- coefs } }# End for test lags }# End for value types }# End for signal suffixes - }# End for geo list + + return(list(coefs = state_coef_list, test_data = state_test_data_list)) + }) # End for geo list + + test_data_list <- map(result, ~.x$test_data) + coef_list <- map(result, ~.x$coefs) if (params$make_predictions) { for (value_type in params$value_types) { for (signal_suffix in signal_suffixes) { key <- make_key(value_type, signal_suffix) - test_combined <- bind_rows(test_data_list[[key]]) - coef_combined <- bind_rows(coef_list[[key]]) + test_combined <- map_dfr(test_data_list, ~.x[[key]]) + coef_combined <- map_dfr(coef_list, ~.x[[key]]) export_test_result(test_combined, coef_combined, indicator=indicator, signal=signal, signal_suffix=signal_suffix, diff --git a/backfill_corrections/params.json.template b/backfill_corrections/params.json.template index 819ee1401..00bbfdf57 100644 --- a/backfill_corrections/params.json.template +++ b/backfill_corrections/params.json.template @@ -9,5 +9,7 @@ "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", - "denom_col": "den" + "denom_col": "den", + "parallel": false, + "parallel_max_cores": 2 }