From 0dd2c9d7c3a55cfe2238ee7c4b536383b2fdf162 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 2 Mar 2023 11:17:05 -0500 Subject: [PATCH 1/7] use joins instead of merge --- .../delphiBackfillCorrection/NAMESPACE | 2 ++ .../R/preprocessing.R | 22 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index 45bec9a37..a12b97aae 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -30,9 +30,11 @@ importFrom(dplyr,bind_rows) importFrom(dplyr,desc) importFrom(dplyr,everything) importFrom(dplyr,filter) +importFrom(dplyr,full_join) importFrom(dplyr,group_by) importFrom(dplyr,group_split) importFrom(dplyr,if_else) +importFrom(dplyr,left_join) importFrom(dplyr,pull) importFrom(dplyr,select) importFrom(dplyr,starts_with) diff --git a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R index 094c92cb2..5d5a36724 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R +++ b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R @@ -174,7 +174,7 @@ add_weekofmonth <- function(df, time_col, wm = WEEK_ISSUES) { #' @template lag_col-template #' @template ref_lag-template #' -#' @importFrom dplyr %>% +#' @importFrom dplyr %>% full_join left_join #' @importFrom tidyr pivot_wider drop_na #' #' @export @@ -190,16 +190,25 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { names(avg_df)[names(avg_df) == value_col] <- 'value_7dav' avg_df_prev7 <- add_shift(avg_df, 7, refd_col) names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav' - - backfill_df <- Reduce(function(x, y) merge(x, y, all=TRUE), - list(df, avg_df, avg_df_prev7)) + + # Join would be faster without the select (minimal) and arrange (3x). It's + # unclear if row order matters for downstream uses of this df. + backfill_df <- Reduce( + function(x, y) full_join(x, y, by=c("time_value", "issue_date")), + list(df, avg_df, avg_df_prev7) + ) + # %>% select(time_value, issue_date, everything()) %>% + # arrange(time_value, issue_date) # Add target target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")] names(target_df)[names(target_df) == value_col] <- 'value_target' names(target_df)[names(target_df) == 'issue_date'] <- 'target_date' - backfill_df <- merge(backfill_df, target_df, by=refd_col, all.x=TRUE) + backfill_df <- left_join(backfill_df, target_df, by=c(refd_col)) + + # Remove invalid rows + backfill_df <- drop_na(backfill_df, c(lag_col)) # Add log values backfill_df$log_value_raw = log(backfill_df$value_raw + 1) @@ -208,9 +217,6 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { backfill_df$log_value_prev_7dav = log(backfill_df$value_prev_7dav + 1) backfill_df$log_7dav_slope = backfill_df$log_value_7dav - backfill_df$log_value_prev_7dav - # Remove invalid rows - backfill_df <- drop_na(backfill_df, c(lag_col)) - return (as.data.frame(backfill_df)) } From 849834eb11f15605520fa659f4db322e383f5589 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Thu, 2 Mar 2023 11:18:00 -0500 Subject: [PATCH 2/7] remove test code --- .../delphiBackfillCorrection/R/preprocessing.R | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R index 5d5a36724..8f0ea725e 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R +++ b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R @@ -191,14 +191,10 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { avg_df_prev7 <- add_shift(avg_df, 7, refd_col) names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav' - # Join would be faster without the select (minimal) and arrange (3x). It's - # unclear if row order matters for downstream uses of this df. backfill_df <- Reduce( function(x, y) full_join(x, y, by=c("time_value", "issue_date")), list(df, avg_df, avg_df_prev7) ) - # %>% select(time_value, issue_date, everything()) %>% - # arrange(time_value, issue_date) # Add target target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")] From c1a9e102a731c977bc7b7dea4822177faeac0dc4 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Fri, 17 Mar 2023 11:16:58 -0400 Subject: [PATCH 3/7] use purrr::reduce instead of base::Reduce --- backfill_corrections/delphiBackfillCorrection/NAMESPACE | 1 + .../delphiBackfillCorrection/R/preprocessing.R | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index e7237e317..787882daf 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -50,6 +50,7 @@ importFrom(lubridate,month) importFrom(lubridate,year) importFrom(parallel,detectCores) importFrom(purrr,map_dfc) +importFrom(purrr,reduce) importFrom(quantgen,quantile_lasso) importFrom(readr,write_csv) importFrom(stats,coef) diff --git a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R index 676a9771b..38213724c 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R +++ b/backfill_corrections/delphiBackfillCorrection/R/preprocessing.R @@ -188,6 +188,7 @@ add_weekofmonth <- function(df, time_col, wm = WEEK_ISSUES) { #' @template ref_lag-template #' #' @importFrom dplyr full_join left_join +#' @importFrom purrr reduce #' @importFrom tidyr pivot_wider drop_na #' #' @export @@ -205,10 +206,10 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) { avg_df_prev7 <- add_shift(avg_df, 7, refd_col) names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav' - backfill_df <- Reduce( - function(x, y) full_join(x, y, by=c("time_value", "issue_date")), - list(df, avg_df, avg_df_prev7) - ) + backfill_df <- reduce( + list(df, avg_df, avg_df_prev7), + full_join, by=c(refd_col, "issue_date") + ) # Add target target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")] From 226d5914e01763d79a985aedc566d73576424b7b Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Mon, 20 Mar 2023 13:06:45 -0400 Subject: [PATCH 4/7] parallelize loop over geos --- .../delphiBackfillCorrection/NAMESPACE | 3 +++ .../delphiBackfillCorrection/R/main.R | 26 ++++++++++++++----- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index c1bb46628..e5dbab4cb 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -47,7 +47,10 @@ importFrom(lubridate,make_date) importFrom(lubridate,month) importFrom(lubridate,year) importFrom(parallel,detectCores) +importFrom(parallel,mclapply) +importFrom(purrr,map) importFrom(purrr,map_dfc) +importFrom(purrr,map_dfr) importFrom(quantgen,quantile_lasso) importFrom(readr,write_csv) importFrom(stats,coef) diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index ae5779d03..8518f567d 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -11,6 +11,8 @@ #' #' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup #' @importFrom tidyr drop_na +#' @importFrom purrr map map_dfr +#' @importFrom parallel mclapply #' #' @export run_backfill <- function(df, params, @@ -61,7 +63,12 @@ run_backfill <- function(df, params, group_dfs <- group_split(df, geo_value) # Build model for each location - for (subdf in group_dfs) { + apply_fn <- ifelse(params$parallel, mclapply, lapply) + result <- apply_fn(group_dfs, function(subdf) { + # Make a copy with the same structure. + state_test_data_list <- test_data_list + state_coef_list <- coef_list + geo <- subdf$geo_value[1] msg_ts("Processing ", geo, " geo group") @@ -204,21 +211,26 @@ run_backfill <- function(df, params, exponentiate_preds(params$taus) key <- make_key(value_type, signal_suffix) - idx <- length(test_data_list[[key]]) + 1 - test_data_list[[key]][[idx]] <- test_data - coef_list[[key]][[idx]] <- coefs + idx <- length(state_test_data_list[[key]]) + 1 + state_test_data_list[[key]][[idx]] <- test_data + state_coef_list[[key]][[idx]] <- coefs } }# End for test lags }# End for value types }# End for signal suffixes - }# End for geo list + + return(list(coefs = state_coef_list, test_data = state_test_data_list)) + }) # End for geo list + + test_data_list <- map(result, ~.x$test_data) + coef_list <- map(result, ~.x$coefs) if (params$make_predictions) { for (value_type in params$value_types) { for (signal_suffix in signal_suffixes) { key <- make_key(value_type, signal_suffix) - test_combined <- bind_rows(test_data_list[[key]]) - coef_combined <- bind_rows(coef_list[[key]]) + test_combined <- map_dfr(test_data_list, ~.x[[key]]) + coef_combined <- map_dfr(coef_list, ~.x[[key]]) export_test_result(test_combined, coef_combined, indicator=indicator, signal=signal, signal_suffix=signal_suffix, From fdd1b5c6c71af754cdd4c4e9a890c9e00649eaf0 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Mon, 20 Mar 2023 16:40:13 -0400 Subject: [PATCH 5/7] use bettermc parallel apply --- backfill_corrections/delphiBackfillCorrection/DESCRIPTION | 3 ++- backfill_corrections/delphiBackfillCorrection/NAMESPACE | 2 +- backfill_corrections/delphiBackfillCorrection/R/main.R | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION index b42519175..a66c80353 100644 --- a/backfill_corrections/delphiBackfillCorrection/DESCRIPTION +++ b/backfill_corrections/delphiBackfillCorrection/DESCRIPTION @@ -27,7 +27,8 @@ Imports: parallel, purrr, vctrs, - RcppRoll + RcppRoll, + bettermc Suggests: knitr (>= 1.15), rmarkdown (>= 1.4), diff --git a/backfill_corrections/delphiBackfillCorrection/NAMESPACE b/backfill_corrections/delphiBackfillCorrection/NAMESPACE index e5dbab4cb..ab1030b20 100644 --- a/backfill_corrections/delphiBackfillCorrection/NAMESPACE +++ b/backfill_corrections/delphiBackfillCorrection/NAMESPACE @@ -23,6 +23,7 @@ export(run_backfill) import(covidcast) importFrom(RcppRoll,roll_mean) importFrom(arrow,read_parquet) +importFrom(bettermc,mclapply) importFrom(dplyr,"%>%") importFrom(dplyr,across) importFrom(dplyr,arrange) @@ -47,7 +48,6 @@ importFrom(lubridate,make_date) importFrom(lubridate,month) importFrom(lubridate,year) importFrom(parallel,detectCores) -importFrom(parallel,mclapply) importFrom(purrr,map) importFrom(purrr,map_dfc) importFrom(purrr,map_dfr) diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index 8518f567d..ce83cbc22 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -12,7 +12,7 @@ #' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup #' @importFrom tidyr drop_na #' @importFrom purrr map map_dfr -#' @importFrom parallel mclapply +#' @importFrom bettermc mclapply #' #' @export run_backfill <- function(df, params, From 16845590b9fc305cf2e6c584d74f31fc9803eda7 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 21 Mar 2023 09:00:41 -0400 Subject: [PATCH 6/7] reduce messages --- backfill_corrections/delphiBackfillCorrection/R/main.R | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/backfill_corrections/delphiBackfillCorrection/R/main.R b/backfill_corrections/delphiBackfillCorrection/R/main.R index ce83cbc22..b9b0fc892 100644 --- a/backfill_corrections/delphiBackfillCorrection/R/main.R +++ b/backfill_corrections/delphiBackfillCorrection/R/main.R @@ -170,7 +170,7 @@ run_backfill <- function(df, params, test_data <- filtered_data[[2]] if (nrow(train_data) == 0 || nrow(test_data) == 0) { - msg_ts("Not enough data to either train or test for test_lag ", + msg_ts("Not enough data to either train or test for test lag ", test_lag, ", skipping") next } @@ -187,7 +187,6 @@ run_backfill <- function(df, params, params_list <- c(YITL, as.vector(unlist(covariates))) # Model training and testing - msg_ts("Training or loading models") prediction_results <- model_training_and_testing( train_data, test_data, taus = params$taus, covariates = params_list, lp_solver = params$lp_solver, @@ -204,7 +203,6 @@ run_backfill <- function(df, params, # Model objects are saved during training, so only need to export # output if making predictions/corrections if (params$make_predictions) { - msg_ts("Generating predictions") test_data <- prediction_results[[1]] coefs <- prediction_results[[2]] test_data <- evaluate(test_data, params$taus) %>% From a8694d71f0262dc47ccf11817cbea8dcad405747 Mon Sep 17 00:00:00 2001 From: Nat DeFries <42820733+nmdefries@users.noreply.github.com> Date: Tue, 21 Mar 2023 09:02:51 -0400 Subject: [PATCH 7/7] default to using parallel mode in production --- ansible/templates/backfillcorr-params-prod.json.j2 | 4 +++- backfill_corrections/params.json.template | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/ansible/templates/backfillcorr-params-prod.json.j2 b/ansible/templates/backfillcorr-params-prod.json.j2 index b6c08fa59..d4ea9280d 100644 --- a/ansible/templates/backfillcorr-params-prod.json.j2 +++ b/ansible/templates/backfillcorr-params-prod.json.j2 @@ -4,12 +4,14 @@ "cache_dir": "./cache", "testing_window": 1, "training_days": 270, - "lag_pad":2, + "lag_pad": 2, "export_dir": "./receiving", "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", "denom_col": "den", + "parallel": true, + "parallel_max_cores": 2, "post": { "aws_credentials": { "aws_access_key_id": "{{ backfillcorr_aws_access_key_id }}", diff --git a/backfill_corrections/params.json.template b/backfill_corrections/params.json.template index 819ee1401..00bbfdf57 100644 --- a/backfill_corrections/params.json.template +++ b/backfill_corrections/params.json.template @@ -9,5 +9,7 @@ "geo_levels": ["state", "county"], "value_types": ["count", "fraction"], "num_col": "num", - "denom_col": "den" + "denom_col": "den", + "parallel": false, + "parallel_max_cores": 2 }