From 0c6ca7da2009ddbcfce49d3639223058a6f9d818 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 13 May 2018 09:07:14 -0700 Subject: [PATCH 1/7] [SPARK-24187][R][SQL]Adding array_join function to SparkR --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 25 +++++++++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/tests/fulltests/test_sparkSQL.R | 10 ++++++++++ 4 files changed, 40 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 73a33af4dd48..9696f6987ad7 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -201,6 +201,7 @@ exportMethods("%<=>%", "approxCountDistinct", "approxQuantile", "array_contains", + "array_join", "array_max", "array_min", "array_position", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index abc91aeeb482..10cd858ce57b 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -222,6 +222,9 @@ NULL #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp)) #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5))) #' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))} +#' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) +#' tmp4 <- mutate(df, v4 = create_array(df$model, df$model)) +#' head(select(tmp4, array_join(tmp4$v4, "#"), array_join(tmp4$v4, "#", "NULL")))} NULL #' Window functions for Column operations @@ -3006,6 +3009,28 @@ setMethod("array_contains", column(jc) }) +#' @details +#' \code{array_join}: Concatenates the elements of column using the delimiter. +#' Null values are replaced with null_replacement if set, otherwise they are ignored. +#' +#' @param delimiter character(s) to use to concatenate the elements of column. +#' @param null_replacement character(s) to use to replace the Null values. +#' @rdname column_collection_functions +#' @aliases array_join array_join,Column-method +#' @note array_join since 2.4.0 +setMethod("array_join", + signature(x = "Column"), + function(x, delimiter, null_replacement = NA) { + jc <- if (is.na(null_replacement)) { + callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter) + } + else { + callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter, + null_replacement) + } + column(jc) + }) + #' @details #' \code{array_max}: Returns the maximum value of the array. #' diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 8894cb1c5b92..9321bbaf96ff 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -757,6 +757,10 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun #' @name NULL setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") }) +#' @rdname column_collection_functions +#' @name NULL +setGeneric("array_join", function(x, delimiter, ...) { standardGeneric("array_join") }) + #' @rdname column_collection_functions #' @name NULL setGeneric("array_max", function(x) { standardGeneric("array_max") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 16c1fd5a065e..65f435eb95f6 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1518,6 +1518,16 @@ test_that("column functions", { result <- collect(select(df, arrays_overlap(df[[1]], df[[2]])))[[1]] expect_equal(result, c(TRUE, FALSE, NA)) + # Test array_join() + df <- createDataFrame(list(list(list("Hello", "World!")))) + result <- collect(select(df, array_join(df[[1]], "#")))[[1]] + expect_equal(result, "Hello#World!") + df2 <- createDataFrame(list(list(list("Hello", NA, "World!")))) + result <- collect(select(df2, array_join(df2[[1]], "#", "Beautiful")))[[1]] + expect_equal(result, "Hello#Beautiful#World!") + result <- collect(select(df2, array_join(df2[[1]], "#")))[[1]] + expect_equal(result, "Hello#World!") + # Test array_sort() and sort_array() df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L)))) From 01858f2f34bac833bd6dd2e749354e6f6646c951 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 13 May 2018 14:51:35 -0700 Subject: [PATCH 2/7] change null_replacement to nullReplacement --- R/pkg/R/functions.R | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 10cd858ce57b..6740f6867d84 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3011,22 +3011,22 @@ setMethod("array_contains", #' @details #' \code{array_join}: Concatenates the elements of column using the delimiter. -#' Null values are replaced with null_replacement if set, otherwise they are ignored. +#' Null values are replaced with nullReplacement if set, otherwise they are ignored. #' #' @param delimiter character(s) to use to concatenate the elements of column. -#' @param null_replacement character(s) to use to replace the Null values. +#' @param nullReplacement character(s) to use to replace the Null values. #' @rdname column_collection_functions #' @aliases array_join array_join,Column-method #' @note array_join since 2.4.0 setMethod("array_join", signature(x = "Column"), - function(x, delimiter, null_replacement = NA) { - jc <- if (is.na(null_replacement)) { + function(x, delimiter, nullReplacement = NA) { + jc <- if (is.na(nullReplacement)) { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter) } else { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter, - null_replacement) + nullReplacement) } column(jc) }) From e05e701f3027607fc6942a81e1a9f8d0a5cc6e5f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 14 May 2018 10:04:42 -0700 Subject: [PATCH 3/7] resolve conflicts --- R/pkg/R/functions.R | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 6740f6867d84..5f38a05a5561 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -221,10 +221,9 @@ NULL #' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp)) #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5))) -#' head(select(tmp, concat(df$mpg, df$cyl, df$hp)))} -#' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) -#' tmp4 <- mutate(df, v4 = create_array(df$model, df$model)) -#' head(select(tmp4, array_join(tmp4$v4, "#"), array_join(tmp4$v4, "#", "NULL")))} +#' head(select(tmp, concat(df$mpg, df$cyl, df$hp))) +#' tmp5 <- mutate(df, v6 = create_array(df$model, df$model)) +#' head(select(tmp5, array_join(tmp5$v6, "#"), array_join(tmp5$v6, "#", "NULL")))} NULL #' Window functions for Column operations @@ -3222,8 +3221,8 @@ setMethod("size", #' (or starting from the end if start is negative) with the specified length. #' #' @rdname column_collection_functions -#' @param start an index indicating the first element occuring in the result. -#' @param length a number of consecutive elements choosen to the result. +#' @param start an index indicating the first element occurring in the result. +#' @param length a number of consecutive elements chosen to the result. #' @aliases slice slice,Column-method #' @note slice since 2.4.0 setMethod("slice", From 92c41c5c98fe8e8802201cb1799b0cf5b56762ac Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 2 Jun 2018 18:25:20 -0700 Subject: [PATCH 4/7] address comments --- R/pkg/R/functions.R | 3 +-- R/pkg/tests/fulltests/test_sparkSQL.R | 5 +++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 5f38a05a5561..42b88f134b9d 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3022,8 +3022,7 @@ setMethod("array_join", function(x, delimiter, nullReplacement = NA) { jc <- if (is.na(nullReplacement)) { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter) - } - else { + } else { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter, nullReplacement) } diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 65f435eb95f6..36e0f78bb059 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1527,6 +1527,11 @@ test_that("column functions", { expect_equal(result, "Hello#Beautiful#World!") result <- collect(select(df2, array_join(df2[[1]], "#")))[[1]] expect_equal(result, "Hello#World!") + df3 <- createDataFrame(list(list(list("Hello", NULL, "World!")))) + result <- collect(select(df3, array_join(df3[[1]], "#", "Beautiful")))[[1]] + expect_equal(result, "Hello#Beautiful#World!") + result <- collect(select(df3, array_join(df3[[1]], "#")))[[1]] + expect_equal(result, "Hello#World!") # Test array_sort() and sort_array() df <- createDataFrame(list(list(list(2L, 1L, 3L, NA)), list(list(NA, 6L, 5L, NA, 4L)))) From 901ff32a03c6ec0c16a0ff7c625781ccf2355a54 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sun, 3 Jun 2018 15:23:51 -0700 Subject: [PATCH 5/7] change default value to NULL --- R/pkg/R/functions.R | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 42b88f134b9d..43dbacbfd6d5 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3012,15 +3012,15 @@ setMethod("array_contains", #' \code{array_join}: Concatenates the elements of column using the delimiter. #' Null values are replaced with nullReplacement if set, otherwise they are ignored. #' -#' @param delimiter character(s) to use to concatenate the elements of column. -#' @param nullReplacement character(s) to use to replace the Null values. +#' @param delimiter a character string that is used to concatenate the elements of column. +#' @param nullReplacement a character string that is used to replace the Null values. #' @rdname column_collection_functions #' @aliases array_join array_join,Column-method #' @note array_join since 2.4.0 setMethod("array_join", - signature(x = "Column"), - function(x, delimiter, nullReplacement = NA) { - jc <- if (is.na(nullReplacement)) { + signature(x = "Column", delimiter = "character"), + function(x, delimiter, nullReplacement = NULL) { + jc <- if (is.null(nullReplacement)) { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter) } else { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter, From b0b14151e42ef638644a6072e0edbec240283c1f Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 4 Jun 2018 10:04:57 -0700 Subject: [PATCH 6/7] coerce nullReplacement to character type --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 43dbacbfd6d5..9989a8b10920 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3024,7 +3024,7 @@ setMethod("array_join", callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter) } else { callJStatic("org.apache.spark.sql.functions", "array_join", x@jc, delimiter, - nullReplacement) + as.character(nullReplacement)) } column(jc) }) From 55b45180f223356887d88c41b4ae538500fe31ae Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Mon, 4 Jun 2018 23:58:33 -0700 Subject: [PATCH 7/7] address comment --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9989a8b10920..3bff633fbc1f 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3013,7 +3013,7 @@ setMethod("array_contains", #' Null values are replaced with nullReplacement if set, otherwise they are ignored. #' #' @param delimiter a character string that is used to concatenate the elements of column. -#' @param nullReplacement a character string that is used to replace the Null values. +#' @param nullReplacement an optional character string that is used to replace the Null values. #' @rdname column_collection_functions #' @aliases array_join array_join,Column-method #' @note array_join since 2.4.0