From f8e19202c7bc7698d52a24f9da0d90baf7f8e034 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Thu, 18 Aug 2016 11:35:17 +0900 Subject: [PATCH 01/21] test for array and byte columns --- R/pkg/R/SQLContext.R | 1 + R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 0c06bba639d9b..d5eeb24e2e8a8 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -205,6 +205,7 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { lapply(data, getInternalType) # convert to rows + # TODO Clark: data <- split(data, 1:nrow(df)) args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) data <- do.call(mapply, append(args, data)) } diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3ccb8b6d77bf6..ae552cdab97f1 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2246,6 +2246,17 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { x[, c("a", "b", "c")] }) expect_identical(expected, result) + + # Ensure UDF's can return columns containing arrays and bytes + add_list_columns <- function(x){ + bytes <- serialize(1:5, NULL) + x$bytecolumn <- rep(list(bytes), nrow(x)) + x$arraycolumn <- rep(list(1:5), nrow(x)) + x + } + expected <- add_list_columns(ldf) + result <- dapplyCollect(df, add_list_columns) + expect_equal(expected, result) }) test_that("repartition by columns on DataFrame", { From 13366051459da3533ed873e1ec96e056ac25a3e1 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Thu, 18 Aug 2016 14:40:18 +0900 Subject: [PATCH 02/21] R createDataFrame.default uses list of dataframes as rows --- R/pkg/R/SQLContext.R | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index d5eeb24e2e8a8..90fcf21b9f16a 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -184,30 +184,23 @@ getDefaultSqlSource <- function() { createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { sparkSession <- getSparkSession() if (is.data.frame(data)) { - # get the names of columns, they will be put into RDD - if (is.null(schema)) { - schema <- names(data) - } + # get the names of columns, they will be put into RDD + if (is.null(schema)) { + schema <- names(data) + } - # get rid of factor type - cleanCols <- function(x) { - if (is.factor(x)) { - as.character(x) - } else { - x - } - } + # convert factors to character + factor_columns <- sapply(data, is.factor) + data[factor_columns] <- lapply(data[factor_columns], as.character) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + colnames(data) <- NULL - # check if all columns have supported type - lapply(data, getInternalType) + # check if all columns have supported type + lapply(data, getInternalType) - # convert to rows - # TODO Clark: data <- split(data, 1:nrow(df)) - args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) - data <- do.call(mapply, append(args, data)) + # data is now a list of rows, and each row is a data.frame + data <- split(data, seq_len(nrow(data))) + names(data) <- NULL } if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) From a0e13ffaf0f9ff0a3f3b9f63a819292d4269856e Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 08:38:14 +0900 Subject: [PATCH 03/21] test is working now --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ae552cdab97f1..3047d5239cb11 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2247,16 +2247,17 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { }) expect_identical(expected, result) - # Ensure UDF's can return columns containing arrays and bytes - add_list_columns <- function(x){ - bytes <- serialize(1:5, NULL) - x$bytecolumn <- rep(list(bytes), nrow(x)) - x$arraycolumn <- rep(list(1:5), nrow(x)) - x - } - expected <- add_list_columns(ldf) - result <- dapplyCollect(df, add_list_columns) - expect_equal(expected, result) + # Ensure UDF's operate on list columns containing arrays and bytes + df_listcols <- data.frame(key = 1:3) + df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) + df_listcols$arr <- lapply(df_listcols$key, + function(x) seq(0, 1, length.out=15)) + + df_listcols_spark <- createDataFrame(df_listcols) + result <- dapplyCollect(df_listcols_spark, function(x) x) + + expect_equal(df_listcols, result) + }) test_that("repartition by columns on DataFrame", { From 50152339d0f0fa6fd071483326b6b43cd8c26a3d Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 09:37:09 +0900 Subject: [PATCH 04/21] add simple collect() to test --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3047d5239cb11..cf1baa2ae9155 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -381,6 +381,7 @@ test_that("create DataFrame with complex types", { expect_equal(s$b, 3L) }) +# TODO Clark: Make sure this one passes test_that("create DataFrame from a data.frame with complex types", { ldf <- data.frame(row.names = 1:2) ldf$a_list <- list(list(1, 2), list(3, 4)) @@ -2254,9 +2255,13 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { function(x) seq(0, 1, length.out=15)) df_listcols_spark <- createDataFrame(df_listcols) - result <- dapplyCollect(df_listcols_spark, function(x) x) - expect_equal(df_listcols, result) + # TODO Clark: Right now both of these lines fail + result1 <- collect(df_listcols_spark) + result2 <- dapplyCollect(df_listcols_spark, function(x) x) + + expect_equal(df_listcols, result1) + expect_equal(df_listcols, result2) }) From d044054da410a94dd97f3321bc0d1ba70c5d5b2d Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 10:28:35 +0900 Subject: [PATCH 05/21] document how tests fail --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 33 +++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index cf1baa2ae9155..213d39ff9621e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2248,21 +2248,44 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { }) expect_identical(expected, result) +############################################################ # Ensure UDF's operate on list columns containing arrays and bytes df_listcols <- data.frame(key = 1:3) df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) - df_listcols$arr <- lapply(df_listcols$key, - function(x) seq(0, 1, length.out=15)) - + #df_listcols$arr <- lapply(df_listcols$key, + # function(x) seq(0, 1, length.out=15)) df_listcols_spark <- createDataFrame(df_listcols) - # TODO Clark: Right now both of these lines fail +# TODO Clark: I think these warnings come from serialize. Should eliminate +# them? +#> result1 <- collect(df_listcols_spark) +#Warning messages: +#1: closing unused connection 4 (->localhost:50241) +#2: closing unused connection 3 (->localhost:50242) + +# TODO Clark: This line fails if the numeric array column is added result1 <- collect(df_listcols_spark) + + expect_identical(df_listcols, result1) +# Fails on patched version with: +#Error: `df_listcols` not equal to `result1`. +#Component “bytes”: Component 1: Modes: raw, list +#Component “bytes”: Component 1: Lengths: 26, 1 +#Component “bytes”: Component 1: target is raw, current is list + + result2 <- dapplyCollect(df_listcols_spark, function(x) x) +# Fails on patched version with: +#R computation failed with +# Error in (function (..., row.names = NULL, check.rows = FALSE, check.names = TRUE, : +# arguments imply differing number of rows: 3, 26 +# at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) + - expect_equal(df_listcols, result1) expect_equal(df_listcols, result2) +############################################################ + }) test_that("repartition by columns on DataFrame", { From 311b55400b2c7787b74ae18d6fe0854dba29eb5d Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 11:20:22 +0900 Subject: [PATCH 06/21] identified where patch went wrong --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 213d39ff9621e..ec4735a066569 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2252,31 +2252,40 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { # Ensure UDF's operate on list columns containing arrays and bytes df_listcols <- data.frame(key = 1:3) df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) +# TODO Clark: Related issue- The dataframe can't be collected if this +# column is added: #df_listcols$arr <- lapply(df_listcols$key, # function(x) seq(0, 1, length.out=15)) df_listcols_spark <- createDataFrame(df_listcols) -# TODO Clark: I think these warnings come from serialize. Should eliminate +# class SparkDataFrame +#PATCH: SparkDataFrame[key:int, bytes:array] +#MASTER: SparkDataFrame[key:int, bytes:binary] +# So clearly MASTER is how it should be here + + +# TODO Clark: I think these warnings come from serialize. Should I try to eliminate # them? #> result1 <- collect(df_listcols_spark) #Warning messages: #1: closing unused connection 4 (->localhost:50241) #2: closing unused connection 3 (->localhost:50242) -# TODO Clark: This line fails if the numeric array column is added result1 <- collect(df_listcols_spark) expect_identical(df_listcols, result1) -# Fails on patched version with: +# PASS MASTER +# FAIL PATCH #Error: `df_listcols` not equal to `result1`. #Component “bytes”: Component 1: Modes: raw, list #Component “bytes”: Component 1: Lengths: 26, 1 #Component “bytes”: Component 1: target is raw, current is list - result2 <- dapplyCollect(df_listcols_spark, function(x) x) -# Fails on patched version with: -#R computation failed with +# FAIL MASTER +# FAIL PATCH +# Same error message: +# R computation failed with # Error in (function (..., row.names = NULL, check.rows = FALSE, check.names = TRUE, : # arguments imply differing number of rows: 3, 26 # at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) From 2d2654dff670094b7a97d7be593544f65183cd5a Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 11:48:29 +0900 Subject: [PATCH 07/21] back to original code for creating data frame --- R/pkg/R/SQLContext.R | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 90fcf21b9f16a..c19ecf93903c6 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -183,25 +183,34 @@ getDefaultSqlSource <- function() { # TODO(davies): support sampling and infer type from NA createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { sparkSession <- getSparkSession() + + # Convert dataframes into a list of rows. Each row is a list if (is.data.frame(data)) { - # get the names of columns, they will be put into RDD - if (is.null(schema)) { - schema <- names(data) - } + # get the names of columns, they will be put into RDD + if (is.null(schema)) { + schema <- names(data) + } - # convert factors to character - factor_columns <- sapply(data, is.factor) - data[factor_columns] <- lapply(data[factor_columns], as.character) + # get rid of factor type + cleanCols <- function(x) { + if (is.factor(x)) { + as.character(x) + } else { + x + } + } - colnames(data) <- NULL + # drop factors and wrap lists + data <- setNames(lapply(data, cleanCols), NULL) - # check if all columns have supported type - lapply(data, getInternalType) + # check if all columns have supported type + lapply(data, getInternalType) - # data is now a list of rows, and each row is a data.frame - data <- split(data, seq_len(nrow(data))) - names(data) <- NULL + # convert to rows + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + data <- do.call(mapply, append(args, data)) } + if (is.list(data)) { sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) rdd <- parallelize(sc, data) From ff1a0d02ea25e134980bf6111ec269f0550b3a80 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 16:25:12 +0900 Subject: [PATCH 08/21] first pass modifying worker.R --- R/pkg/inst/worker/worker.R | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index debf0180183a4..2dc7a5e912974 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -27,6 +27,15 @@ elapsedSecs <- function() { proc.time()[3] } +# Raws (binary data) require a list column in a data.frame +# argument inputData should be a list of rows, with each row a list +rbind_with_raws <- function(inputData, rawcolumns){ + args <- c(inputData, stringsAsFactors = FALSE) + inputData <- as.data.frame(do.call(rbind, args)) + inputData[!rawcolumns] <- lapply(inputData[!rawcolumns], unlist) + inputData +} + compute <- function(mode, partition, serializer, deserializer, key, colNames, computeFunc, inputData) { if (mode > 0) { @@ -36,7 +45,15 @@ compute <- function(mode, partition, serializer, deserializer, key, # available since R 3.2.4. So we set the global option here. oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) - inputData <- do.call(rbind.data.frame, inputData) + +# TODO Clark: + # Handle binary data types + rawcolumns <- ("raw" == sapply(row1, class)) + if(any(rawcolumns)){ + inputData <- rbind_with_raws(inputData, rawcolumns) + } else { + inputData <- do.call(rbind.data.frame, inputData) + } options(stringsAsFactors = oldOpt) names(inputData) <- colNames From 1e27ef3f458ac6ccc23cccc2f3624c76e7cd4d0b Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 16:37:31 +0900 Subject: [PATCH 09/21] no change in error message, reverting --- R/pkg/inst/worker/worker.R | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 2dc7a5e912974..c700a751c554f 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -46,14 +46,16 @@ compute <- function(mode, partition, serializer, deserializer, key, oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) -# TODO Clark: - # Handle binary data types - rawcolumns <- ("raw" == sapply(row1, class)) - if(any(rawcolumns)){ - inputData <- rbind_with_raws(inputData, rawcolumns) - } else { - inputData <- do.call(rbind.data.frame, inputData) - } +# TODO Clark: This didn't change error message +# Handle binary data types +#rawcolumns <- ("raw" == sapply(row1, class)) +#if(any(rawcolumns)){ +# inputData <- rbind_with_raws(inputData, rawcolumns) +#} else { +# inputData <- do.call(rbind.data.frame, inputData) +#} + + inputData <- do.call(rbind.data.frame, inputData) options(stringsAsFactors = oldOpt) names(inputData) <- colNames From 25d0ec12436c7f77b79281c348c04ddac64f2156 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Fri, 19 Aug 2016 16:42:51 +0900 Subject: [PATCH 10/21] an experiment modifying dapplyCollect directly --- R/pkg/R/DataFrame.R | 5 +++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 9 ++------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 09be06de06b52..305fa73887cba 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1381,8 +1381,9 @@ setMethod("dapplyCollect", # which is a serialized data.frame corresponds to one partition of the # SparkDataFrame. ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) - ldf <- do.call(rbind, ldfs) - row.names(ldf) <- NULL +# TODO Clark: a little experiment- comment out these two lines + #ldf <- do.call(rbind, ldfs) + #row.names(ldf) <- NULL ldf }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ec4735a066569..3355dae682d7a 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2275,22 +2275,17 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { expect_identical(df_listcols, result1) # PASS MASTER -# FAIL PATCH -#Error: `df_listcols` not equal to `result1`. -#Component “bytes”: Component 1: Modes: raw, list -#Component “bytes”: Component 1: Lengths: 26, 1 -#Component “bytes”: Component 1: target is raw, current is list +# PASS PATCH result2 <- dapplyCollect(df_listcols_spark, function(x) x) # FAIL MASTER # FAIL PATCH -# Same error message: +# Same error message each time: # R computation failed with # Error in (function (..., row.names = NULL, check.rows = FALSE, check.names = TRUE, : # arguments imply differing number of rows: 3, 26 # at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) - expect_equal(df_listcols, result2) ############################################################ From 77a9822e4cb15c8ee8d48e95597e8e783d2208bb Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 10:50:49 +0900 Subject: [PATCH 11/21] dapplyCollect worked, it just nested things --- R/pkg/R/DataFrame.R | 8 +++--- R/pkg/R/utils.R | 8 ++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 1 + R/pkg/inst/worker/worker.R | 31 ++++++++++++++--------- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 305fa73887cba..d15ca8629735e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1381,9 +1381,11 @@ setMethod("dapplyCollect", # which is a serialized data.frame corresponds to one partition of the # SparkDataFrame. ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) -# TODO Clark: a little experiment- comment out these two lines - #ldf <- do.call(rbind, ldfs) - #row.names(ldf) <- NULL +return(ldfs) +# TODO Clark: a little experiment- return early to show that +# initial trouble comes from the collect method + ldf <- do.call(rbind, ldfs) + row.names(ldf) <- NULL ldf }) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index d78c0a7a539a8..0113aed5cac4e 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -697,3 +697,11 @@ is_master_local <- function(master) { is_sparkR_shell <- function() { grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) } + +.onAttach <- function(libname, pkgname) { + ast <- "************************************************************" + hiclark <- "Clark's dev version!!" + date <- "Fri Aug 19 16:25:47 KST 2016" + msg <- "This is the PATCHED version" + packageStartupMessage(paste(ast, hiclark, date, msg, ast, sep="\n")) +} diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 3355dae682d7a..091baa78b90f8 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2193,6 +2193,7 @@ test_that("Histogram", { }) test_that("dapply() and dapplyCollect() on a DataFrame", { + df <- createDataFrame( list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c")) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index c700a751c554f..f5c1f48122c45 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -29,11 +29,17 @@ elapsedSecs <- function() { # Raws (binary data) require a list column in a data.frame # argument inputData should be a list of rows, with each row a list +# Clark: Just for Testing +#r = serialize(1, NULL) +#inputData <- list(list(1, r), list(2, r), list(3, r)) +#rawcolumns <- c(FALSE, TRUE) + rbind_with_raws <- function(inputData, rawcolumns){ - args <- c(inputData, stringsAsFactors = FALSE) - inputData <- as.data.frame(do.call(rbind, args)) - inputData[!rawcolumns] <- lapply(inputData[!rawcolumns], unlist) - inputData + listmatrix <- do.call(rbind, inputData) + # A dataframe with all list columns + out <- as.data.frame(listmatrix) + out[!rawcolumns] <- lapply(out[!rawcolumns], unlist) + out } compute <- function(mode, partition, serializer, deserializer, key, @@ -48,14 +54,15 @@ compute <- function(mode, partition, serializer, deserializer, key, # TODO Clark: This didn't change error message # Handle binary data types -#rawcolumns <- ("raw" == sapply(row1, class)) -#if(any(rawcolumns)){ -# inputData <- rbind_with_raws(inputData, rawcolumns) -#} else { -# inputData <- do.call(rbind.data.frame, inputData) -#} - - inputData <- do.call(rbind.data.frame, inputData) +row1 = inputData[[1]] +rawcolumns <- ("raw" == sapply(row1, class)) +if(any(rawcolumns)){ + inputData <- rbind_with_raws(inputData, rawcolumns) +} else { + inputData <- do.call(rbind.data.frame, inputData) +} + + #inputData <- do.call(rbind.data.frame, inputData) options(stringsAsFactors = oldOpt) names(inputData) <- colNames From 70b0d44b4f5ea556db63d93e413a0cbbf30c76c2 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 11:27:30 +0900 Subject: [PATCH 12/21] tests pass! --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index d15ca8629735e..b213be86f4162 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1381,7 +1381,7 @@ setMethod("dapplyCollect", # which is a serialized data.frame corresponds to one partition of the # SparkDataFrame. ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) -return(ldfs) +#return(ldfs) # TODO Clark: a little experiment- return early to show that # initial trouble comes from the collect method ldf <- do.call(rbind, ldfs) From b21a21df80a29e2d32c785ef7551cd2a895be8f1 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 11:47:49 +0900 Subject: [PATCH 13/21] put rbind function in utils.R --- R/pkg/R/DataFrame.R | 3 --- R/pkg/R/utils.R | 19 +++++++++----- R/pkg/inst/tests/testthat/test_sparkSQL.R | 32 ++--------------------- R/pkg/inst/worker/worker.R | 31 +++++----------------- 4 files changed, 21 insertions(+), 64 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b213be86f4162..09be06de06b52 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1381,9 +1381,6 @@ setMethod("dapplyCollect", # which is a serialized data.frame corresponds to one partition of the # SparkDataFrame. ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) -#return(ldfs) -# TODO Clark: a little experiment- return early to show that -# initial trouble comes from the collect method ldf <- do.call(rbind, ldfs) row.names(ldf) <- NULL ldf diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 0113aed5cac4e..39e2fc29ef64b 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -698,10 +698,17 @@ is_sparkR_shell <- function() { grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) } -.onAttach <- function(libname, pkgname) { - ast <- "************************************************************" - hiclark <- "Clark's dev version!!" - date <- "Fri Aug 19 16:25:47 KST 2016" - msg <- "This is the PATCHED version" - packageStartupMessage(paste(ast, hiclark, date, msg, ast, sep="\n")) +# rbind a list of rows with raw (binary) columns +# +# @param inputData a list of rows, with each row a list +# @return data.frame with raw columns as lists +rbind_with_raws <- function(inputData){ + row1 <- inputData[[1]] + rawcolumns <- ("raw" == sapply(row1, class)) + + listmatrix <- do.call(rbind, inputData) + # A dataframe with all list columns + out <- as.data.frame(listmatrix) + out[!rawcolumns] <- lapply(out[!rawcolumns], unlist) + out } diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 091baa78b90f8..20958e66c94fa 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2249,48 +2249,20 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { }) expect_identical(expected, result) -############################################################ # Ensure UDF's operate on list columns containing arrays and bytes df_listcols <- data.frame(key = 1:3) df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) -# TODO Clark: Related issue- The dataframe can't be collected if this -# column is added: + # TODO Clark: Related issue- The dataframe can't be collected if this + # column is added: #df_listcols$arr <- lapply(df_listcols$key, # function(x) seq(0, 1, length.out=15)) df_listcols_spark <- createDataFrame(df_listcols) -# class SparkDataFrame -#PATCH: SparkDataFrame[key:int, bytes:array] -#MASTER: SparkDataFrame[key:int, bytes:binary] -# So clearly MASTER is how it should be here - - -# TODO Clark: I think these warnings come from serialize. Should I try to eliminate -# them? -#> result1 <- collect(df_listcols_spark) -#Warning messages: -#1: closing unused connection 4 (->localhost:50241) -#2: closing unused connection 3 (->localhost:50242) - result1 <- collect(df_listcols_spark) - expect_identical(df_listcols, result1) -# PASS MASTER -# PASS PATCH result2 <- dapplyCollect(df_listcols_spark, function(x) x) -# FAIL MASTER -# FAIL PATCH -# Same error message each time: -# R computation failed with -# Error in (function (..., row.names = NULL, check.rows = FALSE, check.names = TRUE, : -# arguments imply differing number of rows: 3, 26 -# at org.apache.spark.api.r.RRunner.compute(RRunner.scala:108) - expect_equal(df_listcols, result2) - -############################################################ - }) test_that("repartition by columns on DataFrame", { diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index f5c1f48122c45..4bbc49cb2c87c 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -27,21 +27,6 @@ elapsedSecs <- function() { proc.time()[3] } -# Raws (binary data) require a list column in a data.frame -# argument inputData should be a list of rows, with each row a list -# Clark: Just for Testing -#r = serialize(1, NULL) -#inputData <- list(list(1, r), list(2, r), list(3, r)) -#rawcolumns <- c(FALSE, TRUE) - -rbind_with_raws <- function(inputData, rawcolumns){ - listmatrix <- do.call(rbind, inputData) - # A dataframe with all list columns - out <- as.data.frame(listmatrix) - out[!rawcolumns] <- lapply(out[!rawcolumns], unlist) - out -} - compute <- function(mode, partition, serializer, deserializer, key, colNames, computeFunc, inputData) { if (mode > 0) { @@ -52,17 +37,13 @@ compute <- function(mode, partition, serializer, deserializer, key, oldOpt <- getOption("stringsAsFactors") options(stringsAsFactors = FALSE) -# TODO Clark: This didn't change error message -# Handle binary data types -row1 = inputData[[1]] -rawcolumns <- ("raw" == sapply(row1, class)) -if(any(rawcolumns)){ - inputData <- rbind_with_raws(inputData, rawcolumns) -} else { - inputData <- do.call(rbind.data.frame, inputData) -} + # Handle binary data types + if("raw" %in% sapply(inputData[[1]], class){ + inputData <- SparkR:::rbind_with_raws(inputData) + } else { + inputData <- do.call(rbind.data.frame, inputData) + } - #inputData <- do.call(rbind.data.frame, inputData) options(stringsAsFactors = oldOpt) names(inputData) <- colNames From ba87b06f999860f054b25b521258216ec0b264a7 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 13:23:09 +0900 Subject: [PATCH 14/21] rename to rbindRaws and put in utils.R --- R/pkg/R/utils.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 +++- R/pkg/inst/tests/testthat/test_utils.R | 9 +++++++++ R/pkg/inst/worker/worker.R | 2 +- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 39e2fc29ef64b..caaa7a8370a0b 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -702,7 +702,7 @@ is_sparkR_shell <- function() { # # @param inputData a list of rows, with each row a list # @return data.frame with raw columns as lists -rbind_with_raws <- function(inputData){ +rbindRaws <- function(inputData){ row1 <- inputData[[1]] rawcolumns <- ("raw" == sapply(row1, class)) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 20958e66c94fa..60c433e16004e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2248,8 +2248,10 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { x[, c("a", "b", "c")] }) expect_identical(expected, result) +}) + +test_that("dapplyCollect() on dataframe with list columns", { - # Ensure UDF's operate on list columns containing arrays and bytes df_listcols <- data.frame(key = 1:3) df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) # TODO Clark: Related issue- The dataframe can't be collected if this diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 83e94a14322f9..bb362ffe62d0d 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -183,4 +183,13 @@ test_that("overrideEnvs", { expect_equal(config[["config_only"]], "ok") }) +test_that("rbindRaws", { + r <- serialize(1, connection=NULL) + inputData <- list(list(1L, r), list(2L, r), list(3L, r)) + expected <- data.frame(V1 = 1:3) + expected$V2 <- list(r, r, r) + result <- SparkR:::rbindRaws(inputData) + expect_equal(expected, result) +}) + sparkR.session.stop() diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 4bbc49cb2c87c..6e2f1bfc99d2f 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -39,7 +39,7 @@ compute <- function(mode, partition, serializer, deserializer, key, # Handle binary data types if("raw" %in% sapply(inputData[[1]], class){ - inputData <- SparkR:::rbind_with_raws(inputData) + inputData <- SparkR:::rbindRaws(inputData) } else { inputData <- do.call(rbind.data.frame, inputData) } From 528fa6e4b57a66f413488e15e8d7f15fbe3ab632 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 13:40:46 +0900 Subject: [PATCH 15/21] just whitespace --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 60c433e16004e..fc1bae403a24c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -381,7 +381,6 @@ test_that("create DataFrame with complex types", { expect_equal(s$b, 3L) }) -# TODO Clark: Make sure this one passes test_that("create DataFrame from a data.frame with complex types", { ldf <- data.frame(row.names = 1:2) ldf$a_list <- list(list(1, 2), list(3, 4)) @@ -2193,7 +2192,6 @@ test_that("Histogram", { }) test_that("dapply() and dapplyCollect() on a DataFrame", { - df <- createDataFrame( list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")), c("a", "b", "c")) @@ -2254,10 +2252,12 @@ test_that("dapplyCollect() on dataframe with list columns", { df_listcols <- data.frame(key = 1:3) df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) - # TODO Clark: Related issue- The dataframe can't be collected if this + + # TODO clarkfitzg: Related issue- The dataframe can't be collected if this # column is added: #df_listcols$arr <- lapply(df_listcols$key, # function(x) seq(0, 1, length.out=15)) + df_listcols_spark <- createDataFrame(df_listcols) result1 <- collect(df_listcols_spark) From e0a3894e7fd00285da85b523ff07957f24bb436e Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 14:16:36 +0900 Subject: [PATCH 16/21] syntax error in worker.R --- R/pkg/inst/worker/worker.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 6e2f1bfc99d2f..a533182cc7ef5 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -38,7 +38,7 @@ compute <- function(mode, partition, serializer, deserializer, key, options(stringsAsFactors = FALSE) # Handle binary data types - if("raw" %in% sapply(inputData[[1]], class){ + if("raw" %in% sapply(inputData[[1]], class)){ inputData <- SparkR:::rbindRaws(inputData) } else { inputData <- do.call(rbind.data.frame, inputData) From 5871257226cc904edac59ec00ca52d5106f84a13 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 14:32:01 +0900 Subject: [PATCH 17/21] satisfy lintr --- R/pkg/inst/tests/testthat/test_utils.R | 2 +- R/pkg/inst/worker/worker.R | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index bb362ffe62d0d..ec1ffb82a25d6 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -184,7 +184,7 @@ test_that("overrideEnvs", { }) test_that("rbindRaws", { - r <- serialize(1, connection=NULL) + r <- serialize(1, connection = NULL) inputData <- list(list(1L, r), list(2L, r), list(3L, r)) expected <- data.frame(V1 = 1:3) expected$V2 <- list(r, r, r) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index a533182cc7ef5..cfe41ded200c2 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -38,7 +38,7 @@ compute <- function(mode, partition, serializer, deserializer, key, options(stringsAsFactors = FALSE) # Handle binary data types - if("raw" %in% sapply(inputData[[1]], class)){ + if ("raw" %in% sapply(inputData[[1]], class)) { inputData <- SparkR:::rbindRaws(inputData) } else { inputData <- do.call(rbind.data.frame, inputData) From 84ef4cc9a265b9e9a61e75fbbd5b22aa6c767a80 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 24 Aug 2016 16:15:19 +0900 Subject: [PATCH 18/21] address sun-rui's comments --- R/pkg/R/SQLContext.R | 3 ++- R/pkg/inst/tests/testthat/test_utils.R | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index c19ecf93903c6..839f4b080eb0c 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -184,8 +184,9 @@ getDefaultSqlSource <- function() { createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) { sparkSession <- getSparkSession() - # Convert dataframes into a list of rows. Each row is a list if (is.data.frame(data)) { + # Convert data into a list of rows. Each row is a list. + # get the names of columns, they will be put into RDD if (is.null(schema)) { schema <- names(data) diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index ec1ffb82a25d6..2fe7a49be08f3 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -188,7 +188,7 @@ test_that("rbindRaws", { inputData <- list(list(1L, r), list(2L, r), list(3L, r)) expected <- data.frame(V1 = 1:3) expected$V2 <- list(r, r, r) - result <- SparkR:::rbindRaws(inputData) + result <- rbindRaws(inputData) expect_equal(expected, result) }) From 0c2a2151cfc0902772711cc8fa22206c6e71348f Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 31 Aug 2016 18:04:47 +0900 Subject: [PATCH 19/21] Change names from list_cols -> binary per felixcheung's feedback --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index fc1bae403a24c..bef65a937b81e 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2248,23 +2248,22 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { expect_identical(expected, result) }) -test_that("dapplyCollect() on dataframe with list columns", { +test_that("dapplyCollect() on dataframe with a binary column", { - df_listcols <- data.frame(key = 1:3) - df_listcols$bytes <- lapply(df_listcols$key, serialize, connection = NULL) + df <- data.frame(key = 1:3) + df$bytes <- lapply(df$key, serialize, connection = NULL) # TODO clarkfitzg: Related issue- The dataframe can't be collected if this # column is added: - #df_listcols$arr <- lapply(df_listcols$key, - # function(x) seq(0, 1, length.out=15)) + #df$arr <- lapply(df$key, function(x) seq(0, 1, length.out=15)) - df_listcols_spark <- createDataFrame(df_listcols) + df_spark <- createDataFrame(df) - result1 <- collect(df_listcols_spark) - expect_identical(df_listcols, result1) + result1 <- collect(df_spark) + expect_identical(df, result1) - result2 <- dapplyCollect(df_listcols_spark, function(x) x) - expect_equal(df_listcols, result2) + result2 <- dapplyCollect(df_spark, function(x) x) + expect_equal(df, result2) }) test_that("repartition by columns on DataFrame", { From 77fa9b4bb121455d51b43ba8705d876e2549850c Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Thu, 1 Sep 2016 07:57:34 +0900 Subject: [PATCH 20/21] fix R style fail --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 ---- 1 file changed, 4 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index bef65a937b81e..4c7e6f4980378 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2253,10 +2253,6 @@ test_that("dapplyCollect() on dataframe with a binary column", { df <- data.frame(key = 1:3) df$bytes <- lapply(df$key, serialize, connection = NULL) - # TODO clarkfitzg: Related issue- The dataframe can't be collected if this - # column is added: - #df$arr <- lapply(df$key, function(x) seq(0, 1, length.out=15)) - df_spark <- createDataFrame(df) result1 <- collect(df_spark) From 91d69be8ed0ff133a1a3f7f304fd6e366d46bca5 Mon Sep 17 00:00:00 2001 From: Clark Fitzgerald Date: Wed, 7 Sep 2016 15:04:18 +0900 Subject: [PATCH 21/21] tests for a single binary column --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 +++++++++-- R/pkg/inst/tests/testthat/test_utils.R | 21 ++++++++++++++++++--- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 4c7e6f4980378..c1faf58d30df5 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2248,7 +2248,7 @@ test_that("dapply() and dapplyCollect() on a DataFrame", { expect_identical(expected, result) }) -test_that("dapplyCollect() on dataframe with a binary column", { +test_that("dapplyCollect() on DataFrame with a binary column", { df <- data.frame(key = 1:3) df$bytes <- lapply(df$key, serialize, connection = NULL) @@ -2259,7 +2259,14 @@ test_that("dapplyCollect() on dataframe with a binary column", { expect_identical(df, result1) result2 <- dapplyCollect(df_spark, function(x) x) - expect_equal(df, result2) + expect_identical(df, result2) + + # A data.frame with a single column of bytes + scb <- subset(df, select = "bytes") + scb_spark <- createDataFrame(scb) + result <- dapplyCollect(scb_spark, function(x) x) + expect_identical(scb, result) + }) test_that("repartition by columns on DataFrame", { diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 2fe7a49be08f3..77f25292f3f29 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -184,12 +184,27 @@ test_that("overrideEnvs", { }) test_that("rbindRaws", { - r <- serialize(1, connection = NULL) - inputData <- list(list(1L, r), list(2L, r), list(3L, r)) + + # Mixed Column types + r <- serialize(1:5, connection = NULL) + r1 <- serialize(1, connection = NULL) + r2 <- serialize(letters, connection = NULL) + r3 <- serialize(1:10, connection = NULL) + inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r), + list(3L, r3, "c", r)) expected <- data.frame(V1 = 1:3) - expected$V2 <- list(r, r, r) + expected$V2 <- list(r1, r2, r3) + expected$V3 <- c("a", "b", "c") + expected$V4 <- list(r, r, r) result <- rbindRaws(inputData) expect_equal(expected, result) + + # Single binary column + input <- list(list(r1), list(r2), list(r3)) + expected <- subset(expected, select = "V2") + result <- setNames(rbindRaws(input), "V2") + expect_equal(expected, result) + }) sparkR.session.stop()