Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ getDefaultSqlSource <- function() {
# TODO(davies): support sampling and infer type from NA
createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
sparkSession <- getSparkSession()

if (is.data.frame(data)) {
# Convert data into a list of rows. Each row is a list.

# get the names of columns, they will be put into RDD
if (is.null(schema)) {
schema <- names(data)
Expand All @@ -208,6 +211,7 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
data <- do.call(mapply, append(args, data))
}

if (is.list(data)) {
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
rdd <- parallelize(sc, data)
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -697,3 +697,18 @@ is_master_local <- function(master) {
is_sparkR_shell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}

# rbind a list of rows with raw (binary) columns
#
# @param inputData a list of rows, with each row a list
# @return data.frame with raw columns as lists
rbindRaws <- function(inputData){
row1 <- inputData[[1]]
rawcolumns <- ("raw" == sapply(row1, class))

listmatrix <- do.call(rbind, inputData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what happens if we have a mixed set of columns here ? i.e. say one column with "raw", one with "integer" and one with "character" -- From reading some docs it looks like everything is converted to create a character matrix when we use rbind.

I think we have two choices if thats the case
(a) we apply the type conversions after rbind
(b) we only call this method when all columns are raw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> b = serialize(1:10, NULL)
> inputData = list(list(1L, b, 'a'), list(2L, b, 'b'))  # Mixed data types
> listmatrix <- do.call(rbind, inputData)
> listmatrix
     [,1] [,2]   [,3]
[1,] 1    Raw,62 "a"
[2,] 2    Raw,62 "b"
> class(listmatrix)
[1] "matrix"
> typeof(listmatrix)
[1] "list"
> is.character(listmatrix)
[1] FALSE

A little unusual- it's a list matrix. Hence the name. Which docs are you referring to?

The test that's in here now does test for mixed columns, but it doesn't test for a single column of raws. I'll add that now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at https://stat.ethz.ch/R-manual/R-devel/library/base/html/cbind.html specifically the section Value which says

The type of a matrix result determined from the highest type of any of the inputs in the hierarchy raw < logical < integer < double < complex < character < list .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the correct class is maintained:

> sapply(listmatrix, class)
[1] "integer"   "integer"   "raw"       "raw"       "character" "character"
> sapply(listmatrix, typeof)
[1] "integer"   "integer"   "raw"       "raw"       "character" "character"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see - the types are inside the listmatrix. Thanks @clarkfitzg for clarifying. Let us know once you have added the test for a single column of raw as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since everything in in inputData is a list this goes straight to the top of hierarchy- same as if you called rbind(list1, list2, ...).

# A dataframe with all list columns
out <- as.data.frame(listmatrix)
out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
out
}
21 changes: 21 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,27 @@ test_that("dapply() and dapplyCollect() on a DataFrame", {
expect_identical(expected, result)
})

test_that("dapplyCollect() on DataFrame with a binary column", {

df <- data.frame(key = 1:3)
df$bytes <- lapply(df$key, serialize, connection = NULL)

df_spark <- createDataFrame(df)

result1 <- collect(df_spark)
expect_identical(df, result1)

result2 <- dapplyCollect(df_spark, function(x) x)
expect_identical(df, result2)

# A data.frame with a single column of bytes
scb <- subset(df, select = "bytes")
scb_spark <- createDataFrame(scb)
result <- dapplyCollect(scb_spark, function(x) x)
expect_identical(scb, result)

})

test_that("repartition by columns on DataFrame", {
df <- createDataFrame(
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
Expand Down
24 changes: 24 additions & 0 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,28 @@ test_that("overrideEnvs", {
expect_equal(config[["config_only"]], "ok")
})

test_that("rbindRaws", {

# Mixed Column types
r <- serialize(1:5, connection = NULL)
r1 <- serialize(1, connection = NULL)
r2 <- serialize(letters, connection = NULL)
r3 <- serialize(1:10, connection = NULL)
inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r),
list(3L, r3, "c", r))
expected <- data.frame(V1 = 1:3)
expected$V2 <- list(r1, r2, r3)
expected$V3 <- c("a", "b", "c")
expected$V4 <- list(r, r, r)
result <- rbindRaws(inputData)
expect_equal(expected, result)

# Single binary column
input <- list(list(r1), list(r2), list(r3))
expected <- subset(expected, select = "V2")
result <- setNames(rbindRaws(input), "V2")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shivaram Here's the new test. I made the other ones a bit more general also.

expect_equal(expected, result)

})

sparkR.session.stop()
9 changes: 8 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ compute <- function(mode, partition, serializer, deserializer, key,
# available since R 3.2.4. So we set the global option here.
oldOpt <- getOption("stringsAsFactors")
options(stringsAsFactors = FALSE)
inputData <- do.call(rbind.data.frame, inputData)

# Handle binary data types
if ("raw" %in% sapply(inputData[[1]], class)) {
inputData <- SparkR:::rbindRaws(inputData)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. SparkR::: is not needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but looking through the rest of worker.R it seems that using ::: is the convention in this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running tests locally without it- appears it is necessary here.

Copy link
Contributor

@sun-rui sun-rui Aug 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it is not a preferred style in worker.R. It seems that they were some changes left slip under some previous code review.

    suppressPackageStartupMessages(library(SparkR))

should be moved to the front of work.R, and thus SparkR::: can be removed. A lot of "SparkR:::" is annoying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I remove SparkR::: and run the tests locally I see error with rbindRaws not found.

So it looks like the SparkR::: needs to be there to access private functions in SparkR from the worker node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And rbindRaws doesn't need to be exported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. "SparkR:::" is needed for private functions.

} else {
inputData <- do.call(rbind.data.frame, inputData)
}

options(stringsAsFactors = oldOpt)

names(inputData) <- colNames
Expand Down