From e21d51e2a7a3794b2807c18239e1ce889dc41dcf Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 May 2017 11:30:22 +0200 Subject: [PATCH 1/8] Initial implementation --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 30 +++++++++++++++++++++++ R/pkg/R/generics.R | 4 +++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 12 +++++++++ 4 files changed, 47 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 7ecd168137e8..daa168c87ecd 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -123,6 +123,7 @@ exportMethods("arrange", "group_by", "groupBy", "head", + "hint", "insertInto", "intersect", "isLocal", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 7e57ba6287bb..f4070ab6e791 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3715,3 +3715,33 @@ setMethod("rollup", sgd <- callJMethod(x@sdf, "rollup", jcol) groupedData(sgd) }) + +#' hint +#' +#' Specifies execution plan hint on the current SparkDataFrame. +#' +#' @param x a SparkDataFrame. +#' @param name a name of the hint. +#' @param ... additional argument(s) passed to the method. +#' +#' @return A SparkDataFrame. +#' @family SparkDataFrame functions +#' @aliases hint,SparkDataFrame,character-method +#' @rdname hint +#' @name hint +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(mtcars) +#' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg") +#' +#' head(join(df, hint(avg_mpg, "broadcast"), df$cyl == avg_mpg$cyl)) +#' } +setMethod("hint", + signature(x = "SparkDataFrame", name = "character"), + function(x, name, ...) { + parameters <-list(...) + stopifnot(all(sapply(parameters, is.character))) + jdf <- callJMethod(x@sdf, "hint", name, parameters) + dataFrame(jdf) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e02d46426a5a..1233a7c9050e 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -572,6 +572,10 @@ setGeneric("first", function(x, ...) { standardGeneric("first") }) #' @export setGeneric("group_by", function(x, ...) { standardGeneric("group_by") }) +#' @rdname hint +#' @export +setGeneric("hint", function(x, name, ...) { standardGeneric("hint") }) + #' @rdname groupBy #' @export setGeneric("groupBy", function(x, ...) { standardGeneric("groupBy") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 12867c15d1f9..f94e343a0de5 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2147,6 +2147,18 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { unlink(jsonPath2) unlink(jsonPath3) + + # Join with broadcast hint + df1 <- sql("SELECT * FROM range(10e10)") + df2 <- sql("SELECT * FROM range(10e10)") + + execution_plan <- capture.output(explain(join(df1, df2, df1$id == df2$id))) + expect_false(any(grepl("BroadcastHashJoin", execution_plan))) + + execution_plan_hint <- capture.output( + explain(join(df1, hint(df2, "broadcast"), df1$id == df2$id)) + ) + expect_true(any(grepl("BroadcastHashJoin", execution_plan_hint))) }) test_that("toJSON() on DataFrame", { From 261e5a636198a1dcc87770bff1cc75bc745b4043 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 May 2017 11:48:51 +0200 Subject: [PATCH 2/8] Add since note --- R/pkg/R/DataFrame.R | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index f4070ab6e791..62ac2e429daf 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3737,6 +3737,7 @@ setMethod("rollup", #' #' head(join(df, hint(avg_mpg, "broadcast"), df$cyl == avg_mpg$cyl)) #' } +#' @note hint since 2.2.0 setMethod("hint", signature(x = "SparkDataFrame", name = "character"), function(x, name, ...) { From ee52b53d1668ab1b48cf0ce659fbb9e32ebd2a3c Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 02:43:24 +0200 Subject: [PATCH 3/8] Fix style --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 62ac2e429daf..0631075137c3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3741,7 +3741,7 @@ setMethod("rollup", setMethod("hint", signature(x = "SparkDataFrame", name = "character"), function(x, name, ...) { - parameters <-list(...) + parameters <- list(...) stopifnot(all(sapply(parameters, is.character))) jdf <- callJMethod(x@sdf, "hint", name, parameters) dataFrame(jdf) From a1e92335ad5075d7d6f36b1f114c9ce0023c940b Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 07:34:10 +0200 Subject: [PATCH 4/8] Put hint generic in the right place --- R/pkg/R/generics.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 1233a7c9050e..56ef1bee9353 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -572,14 +572,14 @@ setGeneric("first", function(x, ...) { standardGeneric("first") }) #' @export setGeneric("group_by", function(x, ...) { standardGeneric("group_by") }) -#' @rdname hint -#' @export -setGeneric("hint", function(x, name, ...) { standardGeneric("hint") }) - #' @rdname groupBy #' @export setGeneric("groupBy", function(x, ...) { standardGeneric("groupBy") }) +#' @rdname hint +#' @export +setGeneric("hint", function(x, name, ...) { standardGeneric("hint") }) + #' @rdname insertInto #' @export setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertInto") }) From 633b038c1cc90578ce6d3e2f8991aa7a49437f2a Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 07:49:46 +0200 Subject: [PATCH 5/8] Remove empty line --- R/pkg/R/DataFrame.R | 1 - 1 file changed, 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0631075137c3..c2085be2c674 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3723,7 +3723,6 @@ setMethod("rollup", #' @param x a SparkDataFrame. #' @param name a name of the hint. #' @param ... additional argument(s) passed to the method. -#' #' @return A SparkDataFrame. #' @family SparkDataFrame functions #' @aliases hint,SparkDataFrame,character-method From 3ed4d760862f49017d44e695931b8fcba207615e Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 08:02:33 +0200 Subject: [PATCH 6/8] Adjust hint description --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index c2085be2c674..ff436999f885 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3718,7 +3718,7 @@ setMethod("rollup", #' hint #' -#' Specifies execution plan hint on the current SparkDataFrame. +#' Hint and return a new SparkDataFrame #' #' @param x a SparkDataFrame. #' @param name a name of the hint. From 1183441a8dcebb8938081a0a7b2203ae2809b30b Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 08:03:24 +0200 Subject: [PATCH 7/8] Adjust ... description --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ff436999f885..b9a7d1c168dc 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3722,7 +3722,7 @@ setMethod("rollup", #' #' @param x a SparkDataFrame. #' @param name a name of the hint. -#' @param ... additional argument(s) passed to the method. +#' @param ... optional parameters for the hint. #' @return A SparkDataFrame. #' @family SparkDataFrame functions #' @aliases hint,SparkDataFrame,character-method From e6c6d82d0494da1645d1f1e3c113c07fadf004cd Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 09:44:28 +0200 Subject: [PATCH 8/8] Adujst description --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b9a7d1c168dc..1c8869202f67 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3718,7 +3718,7 @@ setMethod("rollup", #' hint #' -#' Hint and return a new SparkDataFrame +#' Specifies execution plan hint and return a new SparkDataFrame. #' #' @param x a SparkDataFrame. #' @param name a name of the hint.