Skip to content

Commit 514d3d2

Browse files
committed
Merge branch 'master' into expand-nest-join
2 parents 9055d8a + a140dd7 commit 514d3d2

File tree

208 files changed

+4421
-4820
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

208 files changed

+4421
-4820
lines changed

R/pkg/R/DataFrame.R

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ setMethod("names<-",
271271
signature(x = "DataFrame"),
272272
function(x, value) {
273273
if (!is.null(value)) {
274-
sdf <- callJMethod(x@sdf, "toDF", listToSeq(as.list(value)))
274+
sdf <- callJMethod(x@sdf, "toDF", as.list(value))
275275
dataFrame(sdf)
276276
}
277277
})
@@ -843,10 +843,10 @@ setMethod("groupBy",
843843
function(x, ...) {
844844
cols <- list(...)
845845
if (length(cols) >= 1 && class(cols[[1]]) == "character") {
846-
sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], listToSeq(cols[-1]))
846+
sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1])
847847
} else {
848848
jcol <- lapply(cols, function(c) { c@jc })
849-
sgd <- callJMethod(x@sdf, "groupBy", listToSeq(jcol))
849+
sgd <- callJMethod(x@sdf, "groupBy", jcol)
850850
}
851851
groupedData(sgd)
852852
})
@@ -1079,7 +1079,7 @@ setMethod("subset", signature(x = "DataFrame"),
10791079
#' }
10801080
setMethod("select", signature(x = "DataFrame", col = "character"),
10811081
function(x, col, ...) {
1082-
sdf <- callJMethod(x@sdf, "select", col, toSeq(...))
1082+
sdf <- callJMethod(x@sdf, "select", col, list(...))
10831083
dataFrame(sdf)
10841084
})
10851085

@@ -1090,7 +1090,7 @@ setMethod("select", signature(x = "DataFrame", col = "Column"),
10901090
jcols <- lapply(list(col, ...), function(c) {
10911091
c@jc
10921092
})
1093-
sdf <- callJMethod(x@sdf, "select", listToSeq(jcols))
1093+
sdf <- callJMethod(x@sdf, "select", jcols)
10941094
dataFrame(sdf)
10951095
})
10961096

@@ -1106,7 +1106,7 @@ setMethod("select",
11061106
col(c)@jc
11071107
}
11081108
})
1109-
sdf <- callJMethod(x@sdf, "select", listToSeq(cols))
1109+
sdf <- callJMethod(x@sdf, "select", cols)
11101110
dataFrame(sdf)
11111111
})
11121112

@@ -1133,7 +1133,7 @@ setMethod("selectExpr",
11331133
signature(x = "DataFrame", expr = "character"),
11341134
function(x, expr, ...) {
11351135
exprList <- list(expr, ...)
1136-
sdf <- callJMethod(x@sdf, "selectExpr", listToSeq(exprList))
1136+
sdf <- callJMethod(x@sdf, "selectExpr", exprList)
11371137
dataFrame(sdf)
11381138
})
11391139

@@ -1311,12 +1311,12 @@ setMethod("arrange",
13111311
signature(x = "DataFrame", col = "characterOrColumn"),
13121312
function(x, col, ...) {
13131313
if (class(col) == "character") {
1314-
sdf <- callJMethod(x@sdf, "sort", col, toSeq(...))
1314+
sdf <- callJMethod(x@sdf, "sort", col, list(...))
13151315
} else if (class(col) == "Column") {
13161316
jcols <- lapply(list(col, ...), function(c) {
13171317
c@jc
13181318
})
1319-
sdf <- callJMethod(x@sdf, "sort", listToSeq(jcols))
1319+
sdf <- callJMethod(x@sdf, "sort", jcols)
13201320
}
13211321
dataFrame(sdf)
13221322
})
@@ -1664,7 +1664,7 @@ setMethod("describe",
16641664
signature(x = "DataFrame", col = "character"),
16651665
function(x, col, ...) {
16661666
colList <- list(col, ...)
1667-
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1667+
sdf <- callJMethod(x@sdf, "describe", colList)
16681668
dataFrame(sdf)
16691669
})
16701670

@@ -1674,7 +1674,7 @@ setMethod("describe",
16741674
signature(x = "DataFrame"),
16751675
function(x) {
16761676
colList <- as.list(c(columns(x)))
1677-
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1677+
sdf <- callJMethod(x@sdf, "describe", colList)
16781678
dataFrame(sdf)
16791679
})
16801680

@@ -1731,7 +1731,7 @@ setMethod("dropna",
17311731

17321732
naFunctions <- callJMethod(x@sdf, "na")
17331733
sdf <- callJMethod(naFunctions, "drop",
1734-
as.integer(minNonNulls), listToSeq(as.list(cols)))
1734+
as.integer(minNonNulls), as.list(cols))
17351735
dataFrame(sdf)
17361736
})
17371737

@@ -1815,7 +1815,7 @@ setMethod("fillna",
18151815
sdf <- if (length(cols) == 0) {
18161816
callJMethod(naFunctions, "fill", value)
18171817
} else {
1818-
callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
1818+
callJMethod(naFunctions, "fill", value, as.list(cols))
18191819
}
18201820
dataFrame(sdf)
18211821
})

R/pkg/R/SQLContext.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ infer_type <- function(x) {
4949
stopifnot(length(x) > 0)
5050
names <- names(x)
5151
if (is.null(names)) {
52-
list(type = "array", elementType = infer_type(x[[1]]), containsNull = TRUE)
52+
paste0("array<", infer_type(x[[1]]), ">")
5353
} else {
5454
# StructType
5555
types <- lapply(x, infer_type)
@@ -59,7 +59,7 @@ infer_type <- function(x) {
5959
do.call(structType, fields)
6060
}
6161
} else if (length(x) > 1) {
62-
list(type = "array", elementType = type, containsNull = TRUE)
62+
paste0("array<", infer_type(x[[1]]), ">")
6363
} else {
6464
type
6565
}

R/pkg/R/column.R

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,7 @@ setMethod("cast",
211211
setMethod("%in%",
212212
signature(x = "Column"),
213213
function(x, table) {
214-
table <- listToSeq(as.list(table))
215-
jc <- callJMethod(x@jc, "in", table)
214+
jc <- callJMethod(x@jc, "in", as.list(table))
216215
return(column(jc))
217216
})
218217

R/pkg/R/functions.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,7 +1331,7 @@ setMethod("countDistinct",
13311331
x@jc
13321332
})
13331333
jc <- callJStatic("org.apache.spark.sql.functions", "countDistinct", x@jc,
1334-
listToSeq(jcol))
1334+
jcol)
13351335
column(jc)
13361336
})
13371337

@@ -1348,7 +1348,7 @@ setMethod("concat",
13481348
signature(x = "Column"),
13491349
function(x, ...) {
13501350
jcols <- lapply(list(x, ...), function(x) { x@jc })
1351-
jc <- callJStatic("org.apache.spark.sql.functions", "concat", listToSeq(jcols))
1351+
jc <- callJStatic("org.apache.spark.sql.functions", "concat", jcols)
13521352
column(jc)
13531353
})
13541354

@@ -1366,7 +1366,7 @@ setMethod("greatest",
13661366
function(x, ...) {
13671367
stopifnot(length(list(...)) > 0)
13681368
jcols <- lapply(list(x, ...), function(x) { x@jc })
1369-
jc <- callJStatic("org.apache.spark.sql.functions", "greatest", listToSeq(jcols))
1369+
jc <- callJStatic("org.apache.spark.sql.functions", "greatest", jcols)
13701370
column(jc)
13711371
})
13721372

@@ -1384,7 +1384,7 @@ setMethod("least",
13841384
function(x, ...) {
13851385
stopifnot(length(list(...)) > 0)
13861386
jcols <- lapply(list(x, ...), function(x) { x@jc })
1387-
jc <- callJStatic("org.apache.spark.sql.functions", "least", listToSeq(jcols))
1387+
jc <- callJStatic("org.apache.spark.sql.functions", "least", jcols)
13881388
column(jc)
13891389
})
13901390

@@ -1675,7 +1675,7 @@ setMethod("shiftRightUnsigned", signature(y = "Column", x = "numeric"),
16751675
#' @export
16761676
setMethod("concat_ws", signature(sep = "character", x = "Column"),
16771677
function(sep, x, ...) {
1678-
jcols <- listToSeq(lapply(list(x, ...), function(x) { x@jc }))
1678+
jcols <- lapply(list(x, ...), function(x) { x@jc })
16791679
jc <- callJStatic("org.apache.spark.sql.functions", "concat_ws", sep, jcols)
16801680
column(jc)
16811681
})
@@ -1723,7 +1723,7 @@ setMethod("expr", signature(x = "character"),
17231723
#' @export
17241724
setMethod("format_string", signature(format = "character", x = "Column"),
17251725
function(format, x, ...) {
1726-
jcols <- listToSeq(lapply(list(x, ...), function(arg) { arg@jc }))
1726+
jcols <- lapply(list(x, ...), function(arg) { arg@jc })
17271727
jc <- callJStatic("org.apache.spark.sql.functions",
17281728
"format_string",
17291729
format, jcols)

R/pkg/R/group.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ setMethod("agg",
102102
}
103103
}
104104
jcols <- lapply(cols, function(c) { c@jc })
105-
sdf <- callJMethod(x@sgd, "agg", jcols[[1]], listToSeq(jcols[-1]))
105+
sdf <- callJMethod(x@sgd, "agg", jcols[[1]], jcols[-1])
106106
} else {
107107
stop("agg can only support Column or character")
108108
}
@@ -124,7 +124,7 @@ createMethod <- function(name) {
124124
setMethod(name,
125125
signature(x = "GroupedData"),
126126
function(x, ...) {
127-
sdf <- callJMethod(x@sgd, name, toSeq(...))
127+
sdf <- callJMethod(x@sgd, name, list(...))
128128
dataFrame(sdf)
129129
})
130130
}

R/pkg/R/schema.R

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ structType.structField <- function(x, ...) {
5656
})
5757
stObj <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
5858
"createStructType",
59-
listToSeq(sfObjList))
59+
sfObjList)
6060
structType(stObj)
6161
}
6262

@@ -114,6 +114,35 @@ structField.jobj <- function(x) {
114114
obj
115115
}
116116

117+
checkType <- function(type) {
118+
primtiveTypes <- c("byte",
119+
"integer",
120+
"float",
121+
"double",
122+
"numeric",
123+
"character",
124+
"string",
125+
"binary",
126+
"raw",
127+
"logical",
128+
"boolean",
129+
"timestamp",
130+
"date")
131+
if (type %in% primtiveTypes) {
132+
return()
133+
} else {
134+
m <- regexec("^array<(.*)>$", type)
135+
matchedStrings <- regmatches(type, m)
136+
if (length(matchedStrings[[1]]) >= 2) {
137+
elemType <- matchedStrings[[1]][2]
138+
checkType(elemType)
139+
return()
140+
}
141+
}
142+
143+
stop(paste("Unsupported type for Dataframe:", type))
144+
}
145+
117146
structField.character <- function(x, type, nullable = TRUE) {
118147
if (class(x) != "character") {
119148
stop("Field name must be a string.")
@@ -124,28 +153,13 @@ structField.character <- function(x, type, nullable = TRUE) {
124153
if (class(nullable) != "logical") {
125154
stop("nullable must be either TRUE or FALSE")
126155
}
127-
options <- c("byte",
128-
"integer",
129-
"float",
130-
"double",
131-
"numeric",
132-
"character",
133-
"string",
134-
"binary",
135-
"raw",
136-
"logical",
137-
"boolean",
138-
"timestamp",
139-
"date")
140-
dataType <- if (type %in% options) {
141-
type
142-
} else {
143-
stop(paste("Unsupported type for Dataframe:", type))
144-
}
156+
157+
checkType(type)
158+
145159
sfObj <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
146160
"createStructField",
147161
x,
148-
dataType,
162+
type,
149163
nullable)
150164
structField(sfObj)
151165
}

R/pkg/R/utils.R

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,16 +361,6 @@ numToInt <- function(num) {
361361
as.integer(num)
362362
}
363363

364-
# create a Seq in JVM
365-
toSeq <- function(...) {
366-
callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", list(...))
367-
}
368-
369-
# create a Seq in JVM from a list
370-
listToSeq <- function(l) {
371-
callJStatic("org.apache.spark.sql.api.r.SQLUtils", "toSeq", l)
372-
}
373-
374364
# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
375365
# user defined function (UDF), and to examine variables in the UDF to decide
376366
# if their values should be included in the new function environment.

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,23 @@ mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
4949
jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
5050
writeLines(mockLinesNa, jsonPathNa)
5151

52+
# For test complex types in DataFrame
53+
mockLinesComplexType <-
54+
c("{\"c1\":[1, 2, 3], \"c2\":[\"a\", \"b\", \"c\"], \"c3\":[1.0, 2.0, 3.0]}",
55+
"{\"c1\":[4, 5, 6], \"c2\":[\"d\", \"e\", \"f\"], \"c3\":[4.0, 5.0, 6.0]}",
56+
"{\"c1\":[7, 8, 9], \"c2\":[\"g\", \"h\", \"i\"], \"c3\":[7.0, 8.0, 9.0]}")
57+
complexTypeJsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
58+
writeLines(mockLinesComplexType, complexTypeJsonPath)
59+
5260
test_that("infer types", {
5361
expect_equal(infer_type(1L), "integer")
5462
expect_equal(infer_type(1.0), "double")
5563
expect_equal(infer_type("abc"), "string")
5664
expect_equal(infer_type(TRUE), "boolean")
5765
expect_equal(infer_type(as.Date("2015-03-11")), "date")
5866
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
59-
expect_equal(infer_type(c(1L, 2L)),
60-
list(type = "array", elementType = "integer", containsNull = TRUE))
61-
expect_equal(infer_type(list(1L, 2L)),
62-
list(type = "array", elementType = "integer", containsNull = TRUE))
67+
expect_equal(infer_type(c(1L, 2L)), "array<integer>")
68+
expect_equal(infer_type(list(1L, 2L)), "array<integer>")
6369
testStruct <- infer_type(list(a = 1L, b = "2"))
6470
expect_equal(class(testStruct), "structType")
6571
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)
@@ -236,8 +242,7 @@ test_that("create DataFrame with different data types", {
236242
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
237243
})
238244

239-
# TODO: enable this test after fix serialization for nested object
240-
#test_that("create DataFrame with nested array and struct", {
245+
test_that("create DataFrame with nested array and struct", {
241246
# e <- new.env()
242247
# assign("n", 3L, envir = e)
243248
# l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L))
@@ -247,7 +252,32 @@ test_that("create DataFrame with different data types", {
247252
# expect_equal(count(df), 1)
248253
# ldf <- collect(df)
249254
# expect_equal(ldf[1,], l[[1]])
250-
#})
255+
256+
257+
# ArrayType only for now
258+
l <- list(as.list(1:10), list("a", "b"))
259+
df <- createDataFrame(sqlContext, list(l), c("a", "b"))
260+
expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>")))
261+
expect_equal(count(df), 1)
262+
ldf <- collect(df)
263+
expect_equal(names(ldf), c("a", "b"))
264+
expect_equal(ldf[1, 1][[1]], l[[1]])
265+
expect_equal(ldf[1, 2][[1]], l[[2]])
266+
})
267+
268+
test_that("Collect DataFrame with complex types", {
269+
# only ArrayType now
270+
# TODO: tests for StructType and MapType after they are supported
271+
df <- jsonFile(sqlContext, complexTypeJsonPath)
272+
273+
ldf <- collect(df)
274+
expect_equal(nrow(ldf), 3)
275+
expect_equal(ncol(ldf), 3)
276+
expect_equal(names(ldf), c("c1", "c2", "c3"))
277+
expect_equal(ldf$c1, list(list(1, 2, 3), list(4, 5, 6), list (7, 8, 9)))
278+
expect_equal(ldf$c2, list(list("a", "b", "c"), list("d", "e", "f"), list ("g", "h", "i")))
279+
expect_equal(ldf$c3, list(list(1.0, 2.0, 3.0), list(4.0, 5.0, 6.0), list (7.0, 8.0, 9.0)))
280+
})
251281

252282
test_that("jsonFile() on a local file returns a DataFrame", {
253283
df <- jsonFile(sqlContext, jsonPath)

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Apache Spark
22

33
Spark is a fast and general cluster computing system for Big Data. It provides
4-
high-level APIs in Scala, Java, and Python, and an optimized engine that
4+
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
55
supports general computation graphs for data analysis. It also supports a
66
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
77
MLlib for machine learning, GraphX for graph processing,
@@ -94,5 +94,5 @@ distribution.
9494

9595
## Configuration
9696

97-
Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)
97+
Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)
9898
in the online documentation for an overview on how to configure Spark.

0 commit comments

Comments
 (0)