Skip to content

Commit 3523e1f

Browse files
committed
Merge remote-tracking branch 'upstream/master' into SPARK-6328
2 parents afad086 + cf2e0ae commit 3523e1f

File tree

107 files changed

+3509
-1197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+3509
-1197
lines changed

R/pkg/NAMESPACE

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export("setJobGroup",
2323
exportClasses("DataFrame")
2424

2525
exportMethods("arrange",
26+
"attach",
2627
"cache",
2728
"collect",
2829
"columns",
@@ -64,6 +65,7 @@ exportMethods("arrange",
6465
"repartition",
6566
"sample",
6667
"sample_frac",
68+
"sampleBy",
6769
"saveAsParquetFile",
6870
"saveAsTable",
6971
"saveDF",
@@ -228,7 +230,8 @@ exportMethods("agg")
228230
export("sparkRSQL.init",
229231
"sparkRHive.init")
230232

231-
export("cacheTable",
233+
export("as.DataFrame",
234+
"cacheTable",
232235
"clearCache",
233236
"createDataFrame",
234237
"createExternalTable",
@@ -252,4 +255,4 @@ export("structField",
252255
"structType.structField",
253256
"print.structType")
254257

255-
export("as.data.frame")
258+
export("as.data.frame")

R/pkg/R/DataFrame.R

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,9 +1414,10 @@ setMethod("where",
14141414
#' @param x A Spark DataFrame
14151415
#' @param y A Spark DataFrame
14161416
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
1417-
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
1417+
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
14181418
#' @param joinType The type of join to perform. The following join types are available:
1419-
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
1419+
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
1420+
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
14201421
#' @return A DataFrame containing the result of the join operation.
14211422
#' @rdname join
14221423
#' @name join
@@ -1441,11 +1442,15 @@ setMethod("join",
14411442
if (is.null(joinType)) {
14421443
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
14431444
} else {
1444-
if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
1445+
if (joinType %in% c("inner", "outer", "full", "fullouter",
1446+
"leftouter", "left_outer", "left",
1447+
"rightouter", "right_outer", "right", "leftsemi")) {
1448+
joinType <- gsub("_", "", joinType)
14451449
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
14461450
} else {
14471451
stop("joinType must be one of the following types: ",
1448-
"'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
1452+
"'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
1453+
'rightouter', 'right_outer', 'right', 'leftsemi'")
14491454
}
14501455
}
14511456
}
@@ -1826,17 +1831,15 @@ setMethod("fillna",
18261831
if (length(colNames) == 0 || !all(colNames != "")) {
18271832
stop("value should be an a named list with each name being a column name.")
18281833
}
1829-
1830-
# Convert to the named list to an environment to be passed to JVM
1831-
valueMap <- new.env()
1832-
for (col in colNames) {
1833-
# Check each item in the named list is of valid type
1834-
v <- value[[col]]
1834+
# Check each item in the named list is of valid type
1835+
lapply(value, function(v) {
18351836
if (!(class(v) %in% c("integer", "numeric", "character"))) {
18361837
stop("Each item in value should be an integer, numeric or charactor.")
18371838
}
1838-
valueMap[[col]] <- v
1839-
}
1839+
})
1840+
1841+
# Convert to the named list to an environment to be passed to JVM
1842+
valueMap <- convertNamedListToEnv(value)
18401843

18411844
# When value is a named list, caller is expected not to pass in cols
18421845
if (!is.null(cols)) {
@@ -1881,3 +1884,33 @@ setMethod("as.data.frame",
18811884
}
18821885
collect(x)
18831886
})
1887+
1888+
#' The specified DataFrame is attached to the R search path. This means that
1889+
#' the DataFrame is searched by R when evaluating a variable, so columns in
1890+
#' the DataFrame can be accessed by simply giving their names.
1891+
#'
1892+
#' @rdname attach
1893+
#' @title Attach DataFrame to R search path
1894+
#' @param what (DataFrame) The DataFrame to attach
1895+
#' @param pos (integer) Specify position in search() where to attach.
1896+
#' @param name (character) Name to use for the attached DataFrame. Names
1897+
#' starting with package: are reserved for library.
1898+
#' @param warn.conflicts (logical) If TRUE, warnings are printed about conflicts
1899+
#' from attaching the database, unless that DataFrame contains an object
1900+
#' @examples
1901+
#' \dontrun{
1902+
#' attach(irisDf)
1903+
#' summary(Sepal_Width)
1904+
#' }
1905+
#' @seealso \link{detach}
1906+
setMethod("attach",
1907+
signature(what = "DataFrame"),
1908+
function(what, pos = 2, name = deparse(substitute(what)), warn.conflicts = TRUE) {
1909+
cols <- columns(what)
1910+
stopifnot(length(cols) > 0)
1911+
newEnv <- new.env()
1912+
for (i in 1:length(cols)) {
1913+
assign(x = cols[i], value = what[, cols[i]], envir = newEnv)
1914+
}
1915+
attach(newEnv, pos = pos, name = name, warn.conflicts = warn.conflicts)
1916+
})

R/pkg/R/SQLContext.R

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ infer_type <- function(x) {
3232
numeric = "double",
3333
raw = "binary",
3434
list = "array",
35+
struct = "struct",
3536
environment = "map",
3637
Date = "date",
3738
POSIXlt = "timestamp",
@@ -44,39 +45,42 @@ infer_type <- function(x) {
4445
paste0("map<string,", infer_type(get(key, x)), ">")
4546
} else if (type == "array") {
4647
stopifnot(length(x) > 0)
48+
49+
paste0("array<", infer_type(x[[1]]), ">")
50+
} else if (type == "struct") {
51+
stopifnot(length(x) > 0)
4752
names <- names(x)
48-
if (is.null(names)) {
49-
paste0("array<", infer_type(x[[1]]), ">")
50-
} else {
51-
# StructType
52-
types <- lapply(x, infer_type)
53-
fields <- lapply(1:length(x), function(i) {
54-
structField(names[[i]], types[[i]], TRUE)
55-
})
56-
do.call(structType, fields)
57-
}
53+
stopifnot(!is.null(names))
54+
55+
type <- lapply(seq_along(x), function(i) {
56+
paste0(names[[i]], ":", infer_type(x[[i]]), ",")
57+
})
58+
type <- Reduce(paste0, type)
59+
type <- paste0("struct<", substr(type, 1, nchar(type) - 1), ">")
5860
} else if (length(x) > 1) {
5961
paste0("array<", infer_type(x[[1]]), ">")
6062
} else {
6163
type
6264
}
6365
}
6466

65-
#' Create a DataFrame from an RDD
67+
#' Create a DataFrame
6668
#'
67-
#' Converts an RDD to a DataFrame by infer the types.
69+
#' Converts R data.frame or list into DataFrame.
6870
#'
6971
#' @param sqlContext A SQLContext
7072
#' @param data An RDD or list or data.frame
7173
#' @param schema a list of column names or named list (StructType), optional
7274
#' @return an DataFrame
75+
#' @rdname createDataFrame
7376
#' @export
7477
#' @examples
7578
#'\dontrun{
7679
#' sc <- sparkR.init()
7780
#' sqlContext <- sparkRSQL.init(sc)
78-
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
79-
#' df <- createDataFrame(sqlContext, rdd)
81+
#' df1 <- as.DataFrame(sqlContext, iris)
82+
#' df2 <- as.DataFrame(sqlContext, list(3,4,5,6))
83+
#' df3 <- createDataFrame(sqlContext, iris)
8084
#' }
8185

8286
# TODO(davies): support sampling and infer type from NA
@@ -149,6 +153,13 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
149153
dataFrame(sdf)
150154
}
151155

156+
#' @rdname createDataFrame
157+
#' @aliases createDataFrame
158+
#' @export
159+
as.DataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
160+
createDataFrame(sqlContext, data, schema, samplingRatio)
161+
}
162+
152163
# toDF
153164
#
154165
# Converts an RDD to a DataFrame by infer the types.

R/pkg/R/deserialize.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ readTypedObject <- function(con, type) {
5151
"a" = readArray(con),
5252
"l" = readList(con),
5353
"e" = readEnv(con),
54+
"s" = readStruct(con),
5455
"n" = NULL,
5556
"j" = getJobj(readString(con)),
5657
stop(paste("Unsupported type for deserialization", type)))
@@ -135,6 +136,15 @@ readEnv <- function(con) {
135136
env
136137
}
137138

139+
# Read a field of StructType from DataFrame
140+
# into a named list in R whose class is "struct"
141+
readStruct <- function(con) {
142+
names <- readObject(con)
143+
fields <- readObject(con)
144+
names(fields) <- names
145+
listToStruct(fields)
146+
}
147+
138148
readRaw <- function(con) {
139149
dataLen <- readInt(con)
140150
readBin(con, raw(), as.integer(dataLen), endian = "big")

R/pkg/R/generics.R

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,10 @@ setGeneric("sample",
509509
setGeneric("sample_frac",
510510
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
511511

512+
#' @rdname statfunctions
513+
#' @export
514+
setGeneric("sampleBy", function(x, col, fractions, seed) { standardGeneric("sampleBy") })
515+
512516
#' @rdname saveAsParquetFile
513517
#' @export
514518
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
@@ -1003,3 +1007,7 @@ setGeneric("rbind", signature = "...")
10031007
#' @rdname as.data.frame
10041008
#' @export
10051009
setGeneric("as.data.frame")
1010+
1011+
#' @rdname attach
1012+
#' @export
1013+
setGeneric("attach")

R/pkg/R/schema.R

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ checkType <- function(type) {
136136
switch (firstChar,
137137
a = {
138138
# Array type
139-
m <- regexec("^array<(.*)>$", type)
139+
m <- regexec("^array<(.+)>$", type)
140140
matchedStrings <- regmatches(type, m)
141141
if (length(matchedStrings[[1]]) >= 2) {
142142
elemType <- matchedStrings[[1]][2]
@@ -146,7 +146,7 @@ checkType <- function(type) {
146146
},
147147
m = {
148148
# Map type
149-
m <- regexec("^map<(.*),(.*)>$", type)
149+
m <- regexec("^map<(.+),(.+)>$", type)
150150
matchedStrings <- regmatches(type, m)
151151
if (length(matchedStrings[[1]]) >= 3) {
152152
keyType <- matchedStrings[[1]][2]
@@ -157,6 +157,30 @@ checkType <- function(type) {
157157
checkType(valueType)
158158
return()
159159
}
160+
},
161+
s = {
162+
# Struct type
163+
m <- regexec("^struct<(.+)>$", type)
164+
matchedStrings <- regmatches(type, m)
165+
if (length(matchedStrings[[1]]) >= 2) {
166+
fieldsString <- matchedStrings[[1]][2]
167+
# strsplit does not return the final empty string, so check if
168+
# the final char is ","
169+
if (substr(fieldsString, nchar(fieldsString), nchar(fieldsString)) != ",") {
170+
fields <- strsplit(fieldsString, ",")[[1]]
171+
for (field in fields) {
172+
m <- regexec("^(.+):(.+)$", field)
173+
matchedStrings <- regmatches(field, m)
174+
if (length(matchedStrings[[1]]) >= 3) {
175+
fieldType <- matchedStrings[[1]][3]
176+
checkType(fieldType)
177+
} else {
178+
break
179+
}
180+
}
181+
return()
182+
}
183+
}
160184
})
161185
}
162186

R/pkg/R/serialize.R

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,21 @@
3232
# environment -> Map[String, T], where T is a native type
3333
# jobj -> Object, where jobj is an object created in the backend
3434

35+
getSerdeType <- function(object) {
36+
type <- class(object)[[1]]
37+
if (type != "list") {
38+
type
39+
} else {
40+
# Check if all elements are of same type
41+
elemType <- unique(sapply(object, function(elem) { getSerdeType(elem) }))
42+
if (length(elemType) <= 1) {
43+
"array"
44+
} else {
45+
"list"
46+
}
47+
}
48+
}
49+
3550
writeObject <- function(con, object, writeType = TRUE) {
3651
# NOTE: In R vectors have same type as objects. So we don't support
3752
# passing in vectors as arrays and instead require arrays to be passed
@@ -45,18 +60,22 @@ writeObject <- function(con, object, writeType = TRUE) {
4560
type <- "NULL"
4661
}
4762
}
63+
64+
serdeType <- getSerdeType(object)
4865
if (writeType) {
49-
writeType(con, type)
66+
writeType(con, serdeType)
5067
}
51-
switch(type,
68+
switch(serdeType,
5269
NULL = writeVoid(con),
5370
integer = writeInt(con, object),
5471
character = writeString(con, object),
5572
logical = writeBoolean(con, object),
5673
double = writeDouble(con, object),
5774
numeric = writeDouble(con, object),
5875
raw = writeRaw(con, object),
76+
array = writeArray(con, object),
5977
list = writeList(con, object),
78+
struct = writeList(con, object),
6079
jobj = writeJobj(con, object),
6180
environment = writeEnv(con, object),
6281
Date = writeDate(con, object),
@@ -110,7 +129,7 @@ writeRowSerialize <- function(outputCon, rows) {
110129
serializeRow <- function(row) {
111130
rawObj <- rawConnection(raw(0), "wb")
112131
on.exit(close(rawObj))
113-
writeGenericList(rawObj, row)
132+
writeList(rawObj, row)
114133
rawConnectionValue(rawObj)
115134
}
116135

@@ -128,7 +147,9 @@ writeType <- function(con, class) {
128147
double = "d",
129148
numeric = "d",
130149
raw = "r",
150+
array = "a",
131151
list = "l",
152+
struct = "s",
132153
jobj = "j",
133154
environment = "e",
134155
Date = "D",
@@ -139,15 +160,13 @@ writeType <- function(con, class) {
139160
}
140161

141162
# Used to pass arrays where all the elements are of the same type
142-
writeList <- function(con, arr) {
143-
# All elements should be of same type
144-
elemType <- unique(sapply(arr, function(elem) { class(elem) }))
145-
stopifnot(length(elemType) <= 1)
146-
163+
writeArray <- function(con, arr) {
147164
# TODO: Empty lists are given type "character" right now.
148165
# This may not work if the Java side expects array of any other type.
149-
if (length(elemType) == 0) {
166+
if (length(arr) == 0) {
150167
elemType <- class("somestring")
168+
} else {
169+
elemType <- getSerdeType(arr[[1]])
151170
}
152171

153172
writeType(con, elemType)
@@ -161,7 +180,7 @@ writeList <- function(con, arr) {
161180
}
162181

163182
# Used to pass arrays where the elements can be of different types
164-
writeGenericList <- function(con, list) {
183+
writeList <- function(con, list) {
165184
writeInt(con, length(list))
166185
for (elem in list) {
167186
writeObject(con, elem)
@@ -174,9 +193,9 @@ writeEnv <- function(con, env) {
174193

175194
writeInt(con, len)
176195
if (len > 0) {
177-
writeList(con, as.list(ls(env)))
196+
writeArray(con, as.list(ls(env)))
178197
vals <- lapply(ls(env), function(x) { env[[x]] })
179-
writeGenericList(con, as.list(vals))
198+
writeList(con, as.list(vals))
180199
}
181200
}
182201

0 commit comments

Comments
 (0)