From 5bde8cd7498912cecd8b2f169b60ab7cc86c7436 Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Thu, 8 Oct 2015 14:23:43 -0400 Subject: [PATCH 1/7] SparkR joins. Used DataFrame.R from Spark 1.5.1 because of changes to collect() and orderBy() functions --- R/pkg/R/DataFrame.R | 182 ++++++++++--------------------- R/pkg/inst/tests/test_sparkSQL.R | 168 +++++----------------------- 2 files changed, 88 insertions(+), 262 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 1b9137e6c7934..dd5168d5abde9 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -271,7 +271,7 @@ setMethod("names<-", signature(x = "DataFrame"), function(x, value) { if (!is.null(value)) { - sdf <- callJMethod(x@sdf, "toDF", as.list(value)) + sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value))) dataFrame(sdf) } }) @@ -652,49 +652,18 @@ setMethod("dim", setMethod("collect", signature(x = "DataFrame"), function(x, stringsAsFactors = FALSE) { - names <- columns(x) - ncol <- length(names) - if (ncol <= 0) { - # empty data.frame with 0 columns and 0 rows - data.frame() - } else { - # listCols is a list of columns - listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) - stopifnot(length(listCols) == ncol) - - # An empty data.frame with 0 columns and number of rows as collected - nrow <- length(listCols[[1]]) - if (nrow <= 0) { - df <- data.frame() - } else { - df <- data.frame(row.names = 1 : nrow) - } - - # Append columns one by one - for (colIndex in 1 : ncol) { - # Note: appending a column of list type into a data.frame so that - # data of complex type can be held. But getting a cell from a column - # of list type returns a list instead of a vector. So for columns of - # non-complex type, append them as vector. - col <- listCols[[colIndex]] - if (length(col) <= 0) { - df[[names[colIndex]]] <- col - } else { - # TODO: more robust check on column of primitive types - vec <- do.call(c, col) - if (class(vec) != "list") { - df[[names[colIndex]]] <- vec - } else { - # For columns of complex type, be careful to access them. - # Get a column of complex type returns a list. - # Get a cell from a column of complex type returns a list instead of a vector. - df[[names[colIndex]]] <- col - } - } - } - df - } - }) + # listCols is a list of raw vectors, one per column + listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) + cols <- lapply(listCols, function(col) { + objRaw <- rawConnection(col) + numRows <- readInt(objRaw) + col <- readCol(objRaw, numRows) + close(objRaw) + col + }) + names(cols) <- columns(x) + do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors)) + }) #' Limit #' @@ -843,10 +812,10 @@ setMethod("groupBy", function(x, ...) { cols <- list(...) if (length(cols) >= 1 && class(cols[[1]]) == "character") { - sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1]) + sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], listToSeq(cols[-1])) } else { jcol <- lapply(cols, function(c) { c@jc }) - sgd <- callJMethod(x@sdf, "groupBy", jcol) + sgd <- callJMethod(x@sdf, "groupBy", listToSeq(jcol)) } groupedData(sgd) }) @@ -1075,20 +1044,12 @@ setMethod("subset", signature(x = "DataFrame"), #' select(df, c("col1", "col2")) #' select(df, list(df$name, df$age + 1)) #' # Similar to R data frames columns can also be selected using `$` -#' df[,df$age] +#' df$age #' } setMethod("select", signature(x = "DataFrame", col = "character"), function(x, col, ...) { - if (length(col) > 1) { - if (length(list(...)) > 0) { - stop("To select multiple columns, use a character vector or list for col") - } - - select(x, as.list(col)) - } else { - sdf <- callJMethod(x@sdf, "select", col, list(...)) - dataFrame(sdf) - } + sdf <- callJMethod(x@sdf, "select", col, toSeq(...)) + dataFrame(sdf) }) #' @rdname select @@ -1098,7 +1059,7 @@ setMethod("select", signature(x = "DataFrame", col = "Column"), jcols <- lapply(list(col, ...), function(c) { c@jc }) - sdf <- callJMethod(x@sdf, "select", jcols) + sdf <- callJMethod(x@sdf, "select", listToSeq(jcols)) dataFrame(sdf) }) @@ -1114,7 +1075,7 @@ setMethod("select", col(c)@jc } }) - sdf <- callJMethod(x@sdf, "select", cols) + sdf <- callJMethod(x@sdf, "select", listToSeq(cols)) dataFrame(sdf) }) @@ -1141,7 +1102,7 @@ setMethod("selectExpr", signature(x = "DataFrame", expr = "character"), function(x, expr, ...) { exprList <- list(expr, ...) - sdf <- callJMethod(x@sdf, "selectExpr", exprList) + sdf <- callJMethod(x@sdf, "selectExpr", listToSeq(exprList)) dataFrame(sdf) }) @@ -1298,10 +1259,8 @@ setClassUnion("characterOrColumn", c("character", "Column")) #' Sort a DataFrame by the specified column(s). #' #' @param x A DataFrame to be sorted. -#' @param col A character or Column object vector indicating the fields to sort on +#' @param col Either a Column object or character vector indicating the field to sort on #' @param ... Additional sorting fields -#' @param decreasing A logical argument indicating sorting order for columns when -#' a character vector is specified for col #' @return A DataFrame where all elements are sorted. #' @rdname arrange #' @name arrange @@ -1314,50 +1273,21 @@ setClassUnion("characterOrColumn", c("character", "Column")) #' path <- "path/to/file.json" #' df <- jsonFile(sqlContext, path) #' arrange(df, df$col1) +#' arrange(df, "col1") #' arrange(df, asc(df$col1), desc(abs(df$col2))) -#' arrange(df, "col1", decreasing = TRUE) -#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE)) #' } setMethod("arrange", - signature(x = "DataFrame", col = "Column"), + signature(x = "DataFrame", col = "characterOrColumn"), function(x, col, ...) { + if (class(col) == "character") { + sdf <- callJMethod(x@sdf, "sort", col, toSeq(...)) + } else if (class(col) == "Column") { jcols <- lapply(list(col, ...), function(c) { c@jc }) - - sdf <- callJMethod(x@sdf, "sort", jcols) - dataFrame(sdf) - }) - -#' @rdname arrange -#' @export -setMethod("arrange", - signature(x = "DataFrame", col = "character"), - function(x, col, ..., decreasing = FALSE) { - - # all sorting columns - by <- list(col, ...) - - if (length(decreasing) == 1) { - # in case only 1 boolean argument - decreasing value is specified, - # it will be used for all columns - decreasing <- rep(decreasing, length(by)) - } else if (length(decreasing) != length(by)) { - stop("Arguments 'col' and 'decreasing' must have the same length") + sdf <- callJMethod(x@sdf, "sort", listToSeq(jcols)) } - - # builds a list of columns of type Column - # example: [[1]] Column Species ASC - # [[2]] Column Petal_Length DESC - jcols <- lapply(seq_len(length(decreasing)), function(i){ - if (decreasing[[i]]) { - desc(getColumn(x, by[[i]])) - } else { - asc(getColumn(x, by[[i]])) - } - }) - - do.call("arrange", c(x, jcols)) + dataFrame(sdf) }) #' @rdname arrange @@ -1441,11 +1371,11 @@ setMethod("join", if (is.null(joinType)) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { - if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) { + if (joinType %in% c("inner", "outer", "full", "fullouter", "leftouter", "left", "rightouter", "right", "leftsemi")) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", - "'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'") + "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi'") } } } @@ -1703,7 +1633,7 @@ setMethod("describe", signature(x = "DataFrame", col = "character"), function(x, col, ...) { colList <- list(col, ...) - sdf <- callJMethod(x@sdf, "describe", colList) + sdf <- callJMethod(x@sdf, "describe", listToSeq(colList)) dataFrame(sdf) }) @@ -1713,7 +1643,7 @@ setMethod("describe", signature(x = "DataFrame"), function(x) { colList <- as.list(c(columns(x))) - sdf <- callJMethod(x@sdf, "describe", colList) + sdf <- callJMethod(x@sdf, "describe", listToSeq(colList)) dataFrame(sdf) }) @@ -1770,7 +1700,7 @@ setMethod("dropna", naFunctions <- callJMethod(x@sdf, "na") sdf <- callJMethod(naFunctions, "drop", - as.integer(minNonNulls), as.list(cols)) + as.integer(minNonNulls), listToSeq(as.list(cols))) dataFrame(sdf) }) @@ -1854,30 +1784,36 @@ setMethod("fillna", sdf <- if (length(cols) == 0) { callJMethod(naFunctions, "fill", value) } else { - callJMethod(naFunctions, "fill", value, as.list(cols)) + callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols))) } dataFrame(sdf) }) -#' This function downloads the contents of a DataFrame into an R's data.frame. -#' Since data.frames are held in memory, ensure that you have enough memory -#' in your system to accommodate the contents. +#' crosstab #' -#' @title Download data from a DataFrame into a data.frame -#' @param x a DataFrame -#' @return a data.frame -#' @rdname as.data.frame -#' @examples \dontrun{ +#' Computes a pair-wise frequency table of the given columns. Also known as a contingency +#' table. The number of distinct values for each column should be less than 1e4. At most 1e6 +#' non-zero pair frequencies will be returned. #' -#' irisDF <- createDataFrame(sqlContext, iris) -#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ]) +#' @param col1 name of the first column. Distinct items will make the first item of each row. +#' @param col2 name of the second column. Distinct items will make the column names of the output. +#' @return a local R data.frame representing the contingency table. The first column of each row +#' will be the distinct values of `col1` and the column names will be the distinct values +#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no +#' occurrences will have zero as their counts. +#' +#' @rdname statfunctions +#' @name crosstab +#' @export +#' @examples +#' \dontrun{ +#' df <- jsonFile(sqlCtx, "/path/to/file.json") +#' ct = crosstab(df, "title", "gender") #' } -setMethod("as.data.frame", - signature(x = "DataFrame"), - function(x, ...) { - # Check if additional parameters have been passed - if (length(list(...)) > 0) { - stop(paste("Unused argument(s): ", paste(list(...), collapse=", "))) - } - collect(x) +setMethod("crosstab", + signature(x = "DataFrame", col1 = "character", col2 = "character"), + function(x, col1, col2) { + statFunctions <- callJMethod(x@sdf, "stat") + sct <- callJMethod(statFunctions, "crosstab", col1, col2) + collect(dataFrame(sct)) }) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index e85de2507085c..8152692f75f98 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -49,32 +49,26 @@ mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(mockLinesNa, jsonPathNa) -# For test complex types in DataFrame -mockLinesComplexType <- - c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", - "{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", - "{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") -complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesComplexType, complexTypeJsonPath) - -test_that("infer types and check types", { +test_that("infer types", { expect_equal(infer_type(1L), "integer") expect_equal(infer_type(1.0), "double") expect_equal(infer_type("abc"), "string") expect_equal(infer_type(TRUE), "boolean") expect_equal(infer_type(as.Date("2015-03-11")), "date") expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") - expect_equal(infer_type(c(1L, 2L)), "array") - expect_equal(infer_type(list(1L, 2L)), "array") + expect_equal(infer_type(c(1L, 2L)), + list(type = "array", elementType = "integer", containsNull = TRUE)) + expect_equal(infer_type(list(1L, 2L)), + list(type = "array", elementType = "integer", containsNull = TRUE)) testStruct <- infer_type(list(a = 1L, b = "2")) expect_equal(class(testStruct), "structType") checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE) checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE) e <- new.env() assign("a", 1L, envir = e) - expect_equal(infer_type(e), "map") - - expect_error(checkType("map"), "Key type in a map must be string or character") + expect_equal(infer_type(e), + list(type = "map", keyType = "string", valueType = "integer", + valueContainsNull = TRUE)) }) test_that("structType and structField", { @@ -242,7 +236,8 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) -test_that("create DataFrame with nested array and map", { +# TODO: enable this test after fix serialization for nested object +#test_that("create DataFrame with nested array and struct", { # e <- new.env() # assign("n", 3L, envir = e) # l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L)) @@ -252,64 +247,7 @@ test_that("create DataFrame with nested array and map", { # expect_equal(count(df), 1) # ldf <- collect(df) # expect_equal(ldf[1,], l[[1]]) - - # ArrayType and MapType - e <- new.env() - assign("n", 3L, envir = e) - - l <- list(as.list(1:10), list("a", "b"), e) - df <- createDataFrame(sqlContext, list(l), c("a", "b", "c")) - expect_equal(dtypes(df), list(c("a", "array"), - c("b", "array"), - c("c", "map"))) - expect_equal(count(df), 1) - ldf <- collect(df) - expect_equal(names(ldf), c("a", "b", "c")) - expect_equal(ldf[1, 1][[1]], l[[1]]) - expect_equal(ldf[1, 2][[1]], l[[2]]) - e <- ldf$c[[1]] - expect_equal(class(e), "environment") - expect_equal(ls(e), "n") - expect_equal(e$n, 3L) -}) - -# For test map type in DataFrame -mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", - "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", - "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") -mapTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") -writeLines(mockLinesMapType, mapTypeJsonPath) - -test_that("Collect DataFrame with complex types", { - # ArrayType - df <- jsonFile(sqlContext, complexTypeJsonPath) - - ldf <- collect(df) - expect_equal(nrow(ldf), 3) - expect_equal(ncol(ldf), 3) - expect_equal(names(ldf), c("c1", "c2", "c3")) - expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9))) - expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i"))) - expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0))) - - # MapType - schema <- structType(structField("name", "string"), - structField("info", "map")) - df <- read.df(sqlContext, mapTypeJsonPath, "json", schema) - expect_equal(dtypes(df), list(c("name", "string"), - c("info", "map"))) - ldf <- collect(df) - expect_equal(nrow(ldf), 3) - expect_equal(ncol(ldf), 2) - expect_equal(names(ldf), c("name", "info")) - expect_equal(ldf$name, c("Bob", "Alice", "David")) - bob <- ldf$info[[1]] - expect_equal(class(bob), "environment") - expect_equal(bob$age, 16) - expect_equal(bob$height, 176.5) - - # TODO: tests for StructType after it is supported -}) +#}) test_that("jsonFile() on a local file returns a DataFrame", { df <- jsonFile(sqlContext, jsonPath) @@ -493,32 +431,6 @@ test_that("collect() and take() on a DataFrame return the same number of rows an expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) -test_that("collect() support Unicode characters", { - markUtf8 <- function(s) { - Encoding(s) <- "UTF-8" - s - } - - lines <- c("{\"name\":\"안녕하세요\"}", - "{\"name\":\"您好\", \"age\":30}", - "{\"name\":\"こんにちは\", \"age\":19}", - "{\"name\":\"Xin chào\"}") - - jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") - writeLines(lines, jsonPath) - - df <- read.df(sqlContext, jsonPath, "json") - rdf <- collect(df) - expect_true(is.data.frame(rdf)) - expect_equal(rdf$name[1], markUtf8("안녕하세요")) - expect_equal(rdf$name[2], markUtf8("您好")) - expect_equal(rdf$name[3], markUtf8("こんにちは")) - expect_equal(rdf$name[4], markUtf8("Xin chào")) - - df1 <- createDataFrame(sqlContext, rdf) - expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好")) -}) - test_that("multiple pipeline transformations result in an RDD with the correct values", { df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) { @@ -673,13 +585,6 @@ test_that("select with column", { expect_equal(columns(df3), c("x")) expect_equal(count(df3), 3) expect_equal(collect(select(df3, "x"))[[1, 1]], "x") - - df4 <- select(df, c("name", "age")) - expect_equal(columns(df4), c("name", "age")) - expect_equal(count(df4), 3) - - expect_error(select(df, c("name", "age"), "name"), - "To select multiple columns, use a character vector or list for col") }) test_that("subsetting", { @@ -703,7 +608,7 @@ test_that("subsetting", { df4 <- df[df$age %in% c(19, 30), 1:2] expect_equal(count(df4), 2) expect_equal(columns(df4), c("name", "age")) - + df5 <- df[df$age %in% c(19), c(1,2)] expect_equal(count(df5), 1) expect_equal(columns(df5), c("name", "age")) @@ -989,7 +894,7 @@ test_that("arrange() and orderBy() on a DataFrame", { sorted <- arrange(df, df$age) expect_equal(collect(sorted)[1,2], "Michael") - sorted2 <- arrange(df, "name", decreasing = FALSE) + sorted2 <- arrange(df, "name") expect_equal(collect(sorted2)[2,"age"], 19) sorted3 <- orderBy(df, asc(df$age)) @@ -999,15 +904,6 @@ test_that("arrange() and orderBy() on a DataFrame", { sorted4 <- orderBy(df, desc(df$name)) expect_equal(first(sorted4)$name, "Michael") expect_equal(collect(sorted4)[3,"name"], "Andy") - - sorted5 <- arrange(df, "age", "name", decreasing = TRUE) - expect_equal(collect(sorted5)[1,2], "Andy") - - sorted6 <- arrange(df, "age","name", decreasing = c(T, F)) - expect_equal(collect(sorted6)[1,2], "Andy") - - sorted7 <- arrange(df, "name", decreasing = FALSE) - expect_equal(collect(sorted7)[2,"age"], 19) }) test_that("filter() on a DataFrame", { @@ -1049,7 +945,7 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined2), c("age", "name", "name", "test")) expect_equal(count(joined2), 3) - joined3 <- join(df, df2, df$name == df2$name, "right_outer") + joined3 <- join(df, df2, df$name == df2$name, "rightouter") expect_equal(names(joined3), c("age", "name", "name", "test")) expect_equal(count(joined3), 4) expect_true(is.na(collect(orderBy(joined3, joined3$age))$age[2])) @@ -1059,12 +955,25 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined4), c("newAge", "name", "test")) expect_equal(count(joined4), 4) expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24) + + joined5 <- join(df, df2, df$name == df2$name, "leftouter") + expect_equal(names(joined5), c("age", "name", "name", "test")) + expect_equal(count(joined5), 3) + expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1])) + + joined6 <- join(df, df2, df$name == df2$name, "inner") + expect_equal(names(joined6), c("age", "name", "name", "test")) + expect_equal(count(joined6), 3) + + joined7 <- join(df, df2, df$name == df2$name, "leftsemi") + expect_equal(names(joined7), c("age", "name")) + expect_equal(count(joined7, 3)) merged <- select(merge(df, df2, df$name == df2$name, "outer"), alias(df$age + 5, "newAge"), df$name, df2$test) expect_equal(names(merged), c("newAge", "name", "test")) expect_equal(count(merged), 4) - expect_equal(collect(orderBy(merged, joined4$name))$newAge[3], 24) + expect_equal(collect(orderBy(merged, merged$name))$newAge[3], 24) }) test_that("toJSON() returns an RDD of the correct values", { @@ -1195,7 +1104,7 @@ test_that("describe() and summarize() on a DataFrame", { stats <- describe(df, "age") expect_equal(collect(stats)[1, "summary"], "count") expect_equal(collect(stats)[2, "age"], "24.5") - expect_equal(collect(stats)[3, "age"], "7.7781745930520225") + expect_equal(collect(stats)[3, "age"], "5.5") stats <- describe(df) expect_equal(collect(stats)[4, "name"], "Andy") expect_equal(collect(stats)[5, "age"], "30") @@ -1338,30 +1247,11 @@ test_that("crosstab() on a DataFrame", { expect_identical(expected, ordered) }) -test_that("cov() and corr() on a DataFrame", { - l <- lapply(c(0:9), function(x) { list(x, x * 2.0) }) - df <- createDataFrame(sqlContext, l, c("singles", "doubles")) - result <- cov(df, "singles", "doubles") - expect_true(abs(result - 55.0 / 3) < 1e-12) - - result <- corr(df, "singles", "doubles") - expect_true(abs(result - 1.0) < 1e-12) - result <- corr(df, "singles", "doubles", "pearson") - expect_true(abs(result - 1.0) < 1e-12) -}) - test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) expect_equal(grepl("Table Not Found: blah", retError), TRUE) }) -test_that("Method as.data.frame as a synonym for collect()", { - irisDF <- createDataFrame(sqlContext, iris) - expect_equal(as.data.frame(irisDF), collect(irisDF)) - irisDF2 <- irisDF[irisDF$Species == "setosa", ] - expect_equal(as.data.frame(irisDF2), collect(irisDF2)) -}) - unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) From d4a1ed391795881d2fd82cba3c01aaa26e3ace20 Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Fri, 9 Oct 2015 09:28:32 -0400 Subject: [PATCH 2/7] Pulled from main Spark respository and changed join and test functions on join --- R/pkg/R/DataFrame.R | 178 +++++++++++++++++++++---------- R/pkg/inst/tests/test_sparkSQL.R | 151 +++++++++++++++++++++++--- 2 files changed, 258 insertions(+), 71 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index dd5168d5abde9..d105422637c03 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -271,7 +271,7 @@ setMethod("names<-", signature(x = "DataFrame"), function(x, value) { if (!is.null(value)) { - sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value))) + sdf <- callJMethod(x@sdf, "toDF", as.list(value)) dataFrame(sdf) } }) @@ -652,18 +652,49 @@ setMethod("dim", setMethod("collect", signature(x = "DataFrame"), function(x, stringsAsFactors = FALSE) { - # listCols is a list of raw vectors, one per column - listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) - cols <- lapply(listCols, function(col) { - objRaw <- rawConnection(col) - numRows <- readInt(objRaw) - col <- readCol(objRaw, numRows) - close(objRaw) - col - }) - names(cols) <- columns(x) - do.call(cbind.data.frame, list(cols, stringsAsFactors = stringsAsFactors)) - }) + names <- columns(x) + ncol <- length(names) + if (ncol <= 0) { + # empty data.frame with 0 columns and 0 rows + data.frame() + } else { + # listCols is a list of columns + listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf) + stopifnot(length(listCols) == ncol) + + # An empty data.frame with 0 columns and number of rows as collected + nrow <- length(listCols[[1]]) + if (nrow <= 0) { + df <- data.frame() + } else { + df <- data.frame(row.names = 1 : nrow) + } + + # Append columns one by one + for (colIndex in 1 : ncol) { + # Note: appending a column of list type into a data.frame so that + # data of complex type can be held. But getting a cell from a column + # of list type returns a list instead of a vector. So for columns of + # non-complex type, append them as vector. + col <- listCols[[colIndex]] + if (length(col) <= 0) { + df[[names[colIndex]]] <- col + } else { + # TODO: more robust check on column of primitive types + vec <- do.call(c, col) + if (class(vec) != "list") { + df[[names[colIndex]]] <- vec + } else { + # For columns of complex type, be careful to access them. + # Get a column of complex type returns a list. + # Get a cell from a column of complex type returns a list instead of a vector. + df[[names[colIndex]]] <- col + } + } + } + df + } + }) #' Limit #' @@ -812,10 +843,10 @@ setMethod("groupBy", function(x, ...) { cols <- list(...) if (length(cols) >= 1 && class(cols[[1]]) == "character") { - sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], listToSeq(cols[-1])) + sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1]) } else { jcol <- lapply(cols, function(c) { c@jc }) - sgd <- callJMethod(x@sdf, "groupBy", listToSeq(jcol)) + sgd <- callJMethod(x@sdf, "groupBy", jcol) } groupedData(sgd) }) @@ -1044,12 +1075,20 @@ setMethod("subset", signature(x = "DataFrame"), #' select(df, c("col1", "col2")) #' select(df, list(df$name, df$age + 1)) #' # Similar to R data frames columns can also be selected using `$` -#' df$age +#' df[,df$age] #' } setMethod("select", signature(x = "DataFrame", col = "character"), function(x, col, ...) { - sdf <- callJMethod(x@sdf, "select", col, toSeq(...)) - dataFrame(sdf) + if (length(col) > 1) { + if (length(list(...)) > 0) { + stop("To select multiple columns, use a character vector or list for col") + } + + select(x, as.list(col)) + } else { + sdf <- callJMethod(x@sdf, "select", col, list(...)) + dataFrame(sdf) + } }) #' @rdname select @@ -1059,7 +1098,7 @@ setMethod("select", signature(x = "DataFrame", col = "Column"), jcols <- lapply(list(col, ...), function(c) { c@jc }) - sdf <- callJMethod(x@sdf, "select", listToSeq(jcols)) + sdf <- callJMethod(x@sdf, "select", jcols) dataFrame(sdf) }) @@ -1075,7 +1114,7 @@ setMethod("select", col(c)@jc } }) - sdf <- callJMethod(x@sdf, "select", listToSeq(cols)) + sdf <- callJMethod(x@sdf, "select", cols) dataFrame(sdf) }) @@ -1102,7 +1141,7 @@ setMethod("selectExpr", signature(x = "DataFrame", expr = "character"), function(x, expr, ...) { exprList <- list(expr, ...) - sdf <- callJMethod(x@sdf, "selectExpr", listToSeq(exprList)) + sdf <- callJMethod(x@sdf, "selectExpr", exprList) dataFrame(sdf) }) @@ -1259,8 +1298,10 @@ setClassUnion("characterOrColumn", c("character", "Column")) #' Sort a DataFrame by the specified column(s). #' #' @param x A DataFrame to be sorted. -#' @param col Either a Column object or character vector indicating the field to sort on +#' @param col A character or Column object vector indicating the fields to sort on #' @param ... Additional sorting fields +#' @param decreasing A logical argument indicating sorting order for columns when +#' a character vector is specified for col #' @return A DataFrame where all elements are sorted. #' @rdname arrange #' @name arrange @@ -1273,23 +1314,52 @@ setClassUnion("characterOrColumn", c("character", "Column")) #' path <- "path/to/file.json" #' df <- jsonFile(sqlContext, path) #' arrange(df, df$col1) -#' arrange(df, "col1") #' arrange(df, asc(df$col1), desc(abs(df$col2))) +#' arrange(df, "col1", decreasing = TRUE) +#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE)) #' } setMethod("arrange", - signature(x = "DataFrame", col = "characterOrColumn"), + signature(x = "DataFrame", col = "Column"), function(x, col, ...) { - if (class(col) == "character") { - sdf <- callJMethod(x@sdf, "sort", col, toSeq(...)) - } else if (class(col) == "Column") { jcols <- lapply(list(col, ...), function(c) { c@jc }) - sdf <- callJMethod(x@sdf, "sort", listToSeq(jcols)) - } + + sdf <- callJMethod(x@sdf, "sort", jcols) dataFrame(sdf) }) +#' @rdname arrange +#' @export +setMethod("arrange", + signature(x = "DataFrame", col = "character"), + function(x, col, ..., decreasing = FALSE) { + + # all sorting columns + by <- list(col, ...) + + if (length(decreasing) == 1) { + # in case only 1 boolean argument - decreasing value is specified, + # it will be used for all columns + decreasing <- rep(decreasing, length(by)) + } else if (length(decreasing) != length(by)) { + stop("Arguments 'col' and 'decreasing' must have the same length") + } + + # builds a list of columns of type Column + # example: [[1]] Column Species ASC + # [[2]] Column Petal_Length DESC + jcols <- lapply(seq_len(length(decreasing)), function(i){ + if (decreasing[[i]]) { + desc(getColumn(x, by[[i]])) + } else { + asc(getColumn(x, by[[i]])) + } + }) + + do.call("arrange", c(x, jcols)) + }) + #' @rdname arrange #' @name orderby setMethod("orderBy", @@ -1633,7 +1703,7 @@ setMethod("describe", signature(x = "DataFrame", col = "character"), function(x, col, ...) { colList <- list(col, ...) - sdf <- callJMethod(x@sdf, "describe", listToSeq(colList)) + sdf <- callJMethod(x@sdf, "describe", colList) dataFrame(sdf) }) @@ -1643,7 +1713,7 @@ setMethod("describe", signature(x = "DataFrame"), function(x) { colList <- as.list(c(columns(x))) - sdf <- callJMethod(x@sdf, "describe", listToSeq(colList)) + sdf <- callJMethod(x@sdf, "describe", colList) dataFrame(sdf) }) @@ -1700,7 +1770,7 @@ setMethod("dropna", naFunctions <- callJMethod(x@sdf, "na") sdf <- callJMethod(naFunctions, "drop", - as.integer(minNonNulls), listToSeq(as.list(cols))) + as.integer(minNonNulls), as.list(cols)) dataFrame(sdf) }) @@ -1784,36 +1854,30 @@ setMethod("fillna", sdf <- if (length(cols) == 0) { callJMethod(naFunctions, "fill", value) } else { - callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols))) + callJMethod(naFunctions, "fill", value, as.list(cols)) } dataFrame(sdf) }) -#' crosstab -#' -#' Computes a pair-wise frequency table of the given columns. Also known as a contingency -#' table. The number of distinct values for each column should be less than 1e4. At most 1e6 -#' non-zero pair frequencies will be returned. +#' This function downloads the contents of a DataFrame into an R's data.frame. +#' Since data.frames are held in memory, ensure that you have enough memory +#' in your system to accommodate the contents. #' -#' @param col1 name of the first column. Distinct items will make the first item of each row. -#' @param col2 name of the second column. Distinct items will make the column names of the output. -#' @return a local R data.frame representing the contingency table. The first column of each row -#' will be the distinct values of `col1` and the column names will be the distinct values -#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no -#' occurrences will have zero as their counts. +#' @title Download data from a DataFrame into a data.frame +#' @param x a DataFrame +#' @return a data.frame +#' @rdname as.data.frame +#' @examples \dontrun{ #' -#' @rdname statfunctions -#' @name crosstab -#' @export -#' @examples -#' \dontrun{ -#' df <- jsonFile(sqlCtx, "/path/to/file.json") -#' ct = crosstab(df, "title", "gender") +#' irisDF <- createDataFrame(sqlContext, iris) +#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ]) #' } -setMethod("crosstab", - signature(x = "DataFrame", col1 = "character", col2 = "character"), - function(x, col1, col2) { - statFunctions <- callJMethod(x@sdf, "stat") - sct <- callJMethod(statFunctions, "crosstab", col1, col2) - collect(dataFrame(sct)) +setMethod("as.data.frame", + signature(x = "DataFrame"), + function(x, ...) { + # Check if additional parameters have been passed + if (length(list(...)) > 0) { + stop(paste("Unused argument(s): ", paste(list(...), collapse=", "))) + } + collect(x) }) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 8152692f75f98..0961cea1881ba 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -49,26 +49,32 @@ mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(mockLinesNa, jsonPathNa) -test_that("infer types", { +# For test complex types in DataFrame +mockLinesComplexType <- + c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}", + "{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}", + "{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}") +complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesComplexType, complexTypeJsonPath) + +test_that("infer types and check types", { expect_equal(infer_type(1L), "integer") expect_equal(infer_type(1.0), "double") expect_equal(infer_type("abc"), "string") expect_equal(infer_type(TRUE), "boolean") expect_equal(infer_type(as.Date("2015-03-11")), "date") expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp") - expect_equal(infer_type(c(1L, 2L)), - list(type = "array", elementType = "integer", containsNull = TRUE)) - expect_equal(infer_type(list(1L, 2L)), - list(type = "array", elementType = "integer", containsNull = TRUE)) + expect_equal(infer_type(c(1L, 2L)), "array") + expect_equal(infer_type(list(1L, 2L)), "array") testStruct <- infer_type(list(a = 1L, b = "2")) expect_equal(class(testStruct), "structType") checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE) checkStructField(testStruct$fields()[[2]], "b", "StringType", TRUE) e <- new.env() assign("a", 1L, envir = e) - expect_equal(infer_type(e), - list(type = "map", keyType = "string", valueType = "integer", - valueContainsNull = TRUE)) + expect_equal(infer_type(e), "map") + + expect_error(checkType("map"), "Key type in a map must be string or character") }) test_that("structType and structField", { @@ -236,8 +242,7 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) -# TODO: enable this test after fix serialization for nested object -#test_that("create DataFrame with nested array and struct", { +test_that("create DataFrame with nested array and map", { # e <- new.env() # assign("n", 3L, envir = e) # l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L)) @@ -247,7 +252,64 @@ test_that("create DataFrame with different data types", { # expect_equal(count(df), 1) # ldf <- collect(df) # expect_equal(ldf[1,], l[[1]]) -#}) + + # ArrayType and MapType + e <- new.env() + assign("n", 3L, envir = e) + + l <- list(as.list(1:10), list("a", "b"), e) + df <- createDataFrame(sqlContext, list(l), c("a", "b", "c")) + expect_equal(dtypes(df), list(c("a", "array"), + c("b", "array"), + c("c", "map"))) + expect_equal(count(df), 1) + ldf <- collect(df) + expect_equal(names(ldf), c("a", "b", "c")) + expect_equal(ldf[1, 1][[1]], l[[1]]) + expect_equal(ldf[1, 2][[1]], l[[2]]) + e <- ldf$c[[1]] + expect_equal(class(e), "environment") + expect_equal(ls(e), "n") + expect_equal(e$n, 3L) +}) + +# For test map type in DataFrame +mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", + "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", + "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") +mapTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") +writeLines(mockLinesMapType, mapTypeJsonPath) + +test_that("Collect DataFrame with complex types", { + # ArrayType + df <- jsonFile(sqlContext, complexTypeJsonPath) + + ldf <- collect(df) + expect_equal(nrow(ldf), 3) + expect_equal(ncol(ldf), 3) + expect_equal(names(ldf), c("c1", "c2", "c3")) + expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9))) + expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i"))) + expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0))) + + # MapType + schema <- structType(structField("name", "string"), + structField("info", "map")) + df <- read.df(sqlContext, mapTypeJsonPath, "json", schema) + expect_equal(dtypes(df), list(c("name", "string"), + c("info", "map"))) + ldf <- collect(df) + expect_equal(nrow(ldf), 3) + expect_equal(ncol(ldf), 2) + expect_equal(names(ldf), c("name", "info")) + expect_equal(ldf$name, c("Bob", "Alice", "David")) + bob <- ldf$info[[1]] + expect_equal(class(bob), "environment") + expect_equal(bob$age, 16) + expect_equal(bob$height, 176.5) + + # TODO: tests for StructType after it is supported +}) test_that("jsonFile() on a local file returns a DataFrame", { df <- jsonFile(sqlContext, jsonPath) @@ -431,6 +493,32 @@ test_that("collect() and take() on a DataFrame return the same number of rows an expect_equal(ncol(collect(df)), ncol(take(df, 10))) }) +test_that("collect() support Unicode characters", { + markUtf8 <- function(s) { + Encoding(s) <- "UTF-8" + s + } + + lines <- c("{\"name\":\"안녕하세요\"}", + "{\"name\":\"您好\", \"age\":30}", + "{\"name\":\"こんにちは\", \"age\":19}", + "{\"name\":\"Xin chào\"}") + + jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp") + writeLines(lines, jsonPath) + + df <- read.df(sqlContext, jsonPath, "json") + rdf <- collect(df) + expect_true(is.data.frame(rdf)) + expect_equal(rdf$name[1], markUtf8("안녕하세요")) + expect_equal(rdf$name[2], markUtf8("您好")) + expect_equal(rdf$name[3], markUtf8("こんにちは")) + expect_equal(rdf$name[4], markUtf8("Xin chào")) + + df1 <- createDataFrame(sqlContext, rdf) + expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好")) +}) + test_that("multiple pipeline transformations result in an RDD with the correct values", { df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) { @@ -585,6 +673,13 @@ test_that("select with column", { expect_equal(columns(df3), c("x")) expect_equal(count(df3), 3) expect_equal(collect(select(df3, "x"))[[1, 1]], "x") + + df4 <- select(df, c("name", "age")) + expect_equal(columns(df4), c("name", "age")) + expect_equal(count(df4), 3) + + expect_error(select(df, c("name", "age"), "name"), + "To select multiple columns, use a character vector or list for col") }) test_that("subsetting", { @@ -608,7 +703,7 @@ test_that("subsetting", { df4 <- df[df$age %in% c(19, 30), 1:2] expect_equal(count(df4), 2) expect_equal(columns(df4), c("name", "age")) - + df5 <- df[df$age %in% c(19), c(1,2)] expect_equal(count(df5), 1) expect_equal(columns(df5), c("name", "age")) @@ -894,7 +989,7 @@ test_that("arrange() and orderBy() on a DataFrame", { sorted <- arrange(df, df$age) expect_equal(collect(sorted)[1,2], "Michael") - sorted2 <- arrange(df, "name") + sorted2 <- arrange(df, "name", decreasing = FALSE) expect_equal(collect(sorted2)[2,"age"], 19) sorted3 <- orderBy(df, asc(df$age)) @@ -904,6 +999,15 @@ test_that("arrange() and orderBy() on a DataFrame", { sorted4 <- orderBy(df, desc(df$name)) expect_equal(first(sorted4)$name, "Michael") expect_equal(collect(sorted4)[3,"name"], "Andy") + + sorted5 <- arrange(df, "age", "name", decreasing = TRUE) + expect_equal(collect(sorted5)[1,2], "Andy") + + sorted6 <- arrange(df, "age","name", decreasing = c(T, F)) + expect_equal(collect(sorted6)[1,2], "Andy") + + sorted7 <- arrange(df, "name", decreasing = FALSE) + expect_equal(collect(sorted7)[2,"age"], 19) }) test_that("filter() on a DataFrame", { @@ -1104,7 +1208,7 @@ test_that("describe() and summarize() on a DataFrame", { stats <- describe(df, "age") expect_equal(collect(stats)[1, "summary"], "count") expect_equal(collect(stats)[2, "age"], "24.5") - expect_equal(collect(stats)[3, "age"], "5.5") + expect_equal(collect(stats)[3, "age"], "7.7781745930520225") stats <- describe(df) expect_equal(collect(stats)[4, "name"], "Andy") expect_equal(collect(stats)[5, "age"], "30") @@ -1247,11 +1351,30 @@ test_that("crosstab() on a DataFrame", { expect_identical(expected, ordered) }) +test_that("cov() and corr() on a DataFrame", { + l <- lapply(c(0:9), function(x) { list(x, x * 2.0) }) + df <- createDataFrame(sqlContext, l, c("singles", "doubles")) + result <- cov(df, "singles", "doubles") + expect_true(abs(result - 55.0 / 3) < 1e-12) + + result <- corr(df, "singles", "doubles") + expect_true(abs(result - 1.0) < 1e-12) + result <- corr(df, "singles", "doubles", "pearson") + expect_true(abs(result - 1.0) < 1e-12) +}) + test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) expect_equal(grepl("Table Not Found: blah", retError), TRUE) }) +test_that("Method as.data.frame as a synonym for collect()", { + irisDF <- createDataFrame(sqlContext, iris) + expect_equal(as.data.frame(irisDF), collect(irisDF)) + irisDF2 <- irisDF[irisDF$Species == "setosa", ] + expect_equal(as.data.frame(irisDF2), collect(irisDF2)) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) From efa072caaaf9c4805e63da22a9342504a292c08c Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Fri, 9 Oct 2015 10:01:21 -0400 Subject: [PATCH 3/7] joined7 typo expect_equal(count(joined7, 3)) was changed to expect_equal(count(joined7), 3) --- R/pkg/inst/tests/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 0961cea1881ba..fc604fa746d1b 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -1071,7 +1071,7 @@ test_that("join() and merge() on a DataFrame", { joined7 <- join(df, df2, df$name == df2$name, "leftsemi") expect_equal(names(joined7), c("age", "name")) - expect_equal(count(joined7, 3)) + expect_equal(count(joined7), 3) merged <- select(merge(df, df2, df$name == df2$name, "outer"), alias(df$age + 5, "newAge"), df$name, df2$test) From 216be3780d909ee6ac8d217cf34cb7dec073793e Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Fri, 9 Oct 2015 10:53:51 -0400 Subject: [PATCH 4/7] Fixed style issues to pass lintr tests --- R/pkg/R/DataFrame.R | 11 +++++++---- R/pkg/inst/tests/test_sparkSQL.R | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index d105422637c03..146911cfd3390 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1414,9 +1414,10 @@ setMethod("where", #' @param x A Spark DataFrame #' @param y A Spark DataFrame #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a -#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join +#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join #' @param joinType The type of join to perform. The following join types are available: -#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner". +#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left', +#' 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". #' @return A DataFrame containing the result of the join operation. #' @rdname join #' @name join @@ -1441,11 +1442,13 @@ setMethod("join", if (is.null(joinType)) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { - if (joinType %in% c("inner", "outer", "full", "fullouter", "leftouter", "left", "rightouter", "right", "leftsemi")) { + if (joinType %in% c("inner", "outer", "full", "fullouter", + "leftouter", "left", "rightouter", "right", "leftsemi")) { sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", - "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi'") + "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', + 'rightouter', 'right', 'leftsemi'") } } } diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index fc604fa746d1b..3840210e8c34b 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -1059,16 +1059,16 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined4), c("newAge", "name", "test")) expect_equal(count(joined4), 4) expect_equal(collect(orderBy(joined4, joined4$name))$newAge[3], 24) - + joined5 <- join(df, df2, df$name == df2$name, "leftouter") expect_equal(names(joined5), c("age", "name", "name", "test")) expect_equal(count(joined5), 3) expect_true(is.na(collect(orderBy(joined5, joined5$age))$age[1])) - + joined6 <- join(df, df2, df$name == df2$name, "inner") expect_equal(names(joined6), c("age", "name", "name", "test")) expect_equal(count(joined6), 3) - + joined7 <- join(df, df2, df$name == df2$name, "leftsemi") expect_equal(names(joined7), c("age", "name")) expect_equal(count(joined7), 3) From 96037228048a290cfc07bde9a991bec373d36d1b Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Tue, 13 Oct 2015 15:38:50 -0400 Subject: [PATCH 5/7] Add support for right_outer and left_outer --- R/pkg/R/DataFrame.R | 11 ++++++----- R/pkg/inst/tests/test_sparkSQL.R | 10 ++++++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 146911cfd3390..3f18f501a37f3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1416,8 +1416,8 @@ setMethod("where", #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a #' Column expression. If joinExpr is omitted, join() will perform a Cartesian join #' @param joinType The type of join to perform. The following join types are available: -#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left', -#' 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". +#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left', +#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner". #' @return A DataFrame containing the result of the join operation. #' @rdname join #' @name join @@ -1443,12 +1443,13 @@ setMethod("join", sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { if (joinType %in% c("inner", "outer", "full", "fullouter", - "leftouter", "left", "rightouter", "right", "leftsemi")) { + "leftouter", "left_outer", "left", "rightouter", "right_outer", "right", "leftsemi")) { + joinType <- gsub("_", "", joinType) sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", - "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', - 'rightouter', 'right', 'leftsemi'") + "'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left', + 'rightouter', 'right_outer', 'right', 'leftsemi'") } } } diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 3840210e8c34b..ed10ed3939cef 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -1073,6 +1073,16 @@ test_that("join() and merge() on a DataFrame", { expect_equal(names(joined7), c("age", "name")) expect_equal(count(joined7), 3) + joined8 <- join(df, df2, df$name == df2$name, "left_outer") + expect_equal(names(joined8), c("age", "name", "name", "test")) + expect_equal(count(joined8), 3) + expect_true(is.na(collect(orderBy(joined8, joined8$age))$age[1])) + + joined9 <- join(df, df2, df$name == df2$name, "right_outer") + expect_equal(names(joined9), c("age", "name", "name", "test")) + expect_equal(count(joined9), 4) + expect_true(is.na(collect(orderBy(joined9, joined9$age))$age[2])) + merged <- select(merge(df, df2, df$name == df2$name, "outer"), alias(df$age + 5, "newAge"), df$name, df2$test) expect_equal(names(merged), c("newAge", "name", "test")) From a67965ae9322303511a2b1f2fb3a2d2be043fb85 Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Tue, 13 Oct 2015 15:50:23 -0400 Subject: [PATCH 6/7] Fixing R style errors --- R/pkg/R/DataFrame.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3f18f501a37f3..ecc2d509fc13e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1443,8 +1443,9 @@ setMethod("join", sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { if (joinType %in% c("inner", "outer", "full", "fullouter", - "leftouter", "left_outer", "left", "rightouter", "right_outer", "right", "leftsemi")) { - joinType <- gsub("_", "", joinType) + "leftouter", "left_outer", "left", + "rightouter", "right_outer", "right", "leftsemi")) { + joinType <- gsub("_", "", joinType) sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ", From d4eff64c67452b166e86b2bc3d9a2486b8f18657 Mon Sep 17 00:00:00 2001 From: Monica Liu Date: Tue, 13 Oct 2015 16:09:12 -0400 Subject: [PATCH 7/7] Actually fixes R style errors, I think --- R/pkg/R/DataFrame.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ecc2d509fc13e..53b2887b52604 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1443,9 +1443,9 @@ setMethod("join", sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc) } else { if (joinType %in% c("inner", "outer", "full", "fullouter", - "leftouter", "left_outer", "left", - "rightouter", "right_outer", "right", "leftsemi")) { - joinType <- gsub("_", "", joinType) + "leftouter", "left_outer", "left", + "rightouter", "right_outer", "right", "leftsemi")) { + joinType <- gsub("_", "", joinType) sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType) } else { stop("joinType must be one of the following types: ",