Skip to content

Commit bd9cd94

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into trackStateByKey
2 parents b7c653d + e1a897b commit bd9cd94

File tree

619 files changed

+20222
-8883
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

619 files changed

+20222
-8883
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
265265
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
266266
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
267267
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
268-
(The New BSD License) Py4J (net.sf.py4j:py4j:0.8.2.1 - http://py4j.sourceforge.net/)
268+
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
269269
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
270270
(BSD licence) sbt and sbt-launch-lib.bash
271271
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

R/pkg/DESCRIPTION

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ Collate:
3333
'mllib.R'
3434
'serialize.R'
3535
'sparkR.R'
36+
'stats.R'
3637
'utils.R'

R/pkg/NAMESPACE

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@ export("setJobGroup",
2323
exportClasses("DataFrame")
2424

2525
exportMethods("arrange",
26+
"attach",
2627
"cache",
2728
"collect",
2829
"columns",
2930
"count",
31+
"cov",
32+
"corr",
3033
"crosstab",
3134
"describe",
3235
"dim",
@@ -38,6 +41,7 @@ exportMethods("arrange",
3841
"fillna",
3942
"filter",
4043
"first",
44+
"freqItems",
4145
"group_by",
4246
"groupBy",
4347
"head",
@@ -61,6 +65,7 @@ exportMethods("arrange",
6165
"repartition",
6266
"sample",
6367
"sample_frac",
68+
"sampleBy",
6469
"saveAsParquetFile",
6570
"saveAsTable",
6671
"saveDF",
@@ -104,6 +109,7 @@ exportMethods("%in%",
104109
"cbrt",
105110
"ceil",
106111
"ceiling",
112+
"column",
107113
"concat",
108114
"concat_ws",
109115
"contains",
@@ -224,7 +230,8 @@ exportMethods("agg")
224230
export("sparkRSQL.init",
225231
"sparkRHive.init")
226232

227-
export("cacheTable",
233+
export("as.DataFrame",
234+
"cacheTable",
228235
"clearCache",
229236
"createDataFrame",
230237
"createExternalTable",
@@ -248,4 +255,4 @@ export("structField",
248255
"structType.structField",
249256
"print.structType")
250257

251-
export("as.data.frame")
258+
export("as.data.frame")

R/pkg/R/DataFrame.R

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,8 +1298,10 @@ setClassUnion("characterOrColumn", c("character", "Column"))
12981298
#' Sort a DataFrame by the specified column(s).
12991299
#'
13001300
#' @param x A DataFrame to be sorted.
1301-
#' @param col Either a Column object or character vector indicating the field to sort on
1301+
#' @param col A character or Column object vector indicating the fields to sort on
13021302
#' @param ... Additional sorting fields
1303+
#' @param decreasing A logical argument indicating sorting order for columns when
1304+
#' a character vector is specified for col
13031305
#' @return A DataFrame where all elements are sorted.
13041306
#' @rdname arrange
13051307
#' @name arrange
@@ -1312,23 +1314,52 @@ setClassUnion("characterOrColumn", c("character", "Column"))
13121314
#' path <- "path/to/file.json"
13131315
#' df <- jsonFile(sqlContext, path)
13141316
#' arrange(df, df$col1)
1315-
#' arrange(df, "col1")
13161317
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
1318+
#' arrange(df, "col1", decreasing = TRUE)
1319+
#' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
13171320
#' }
13181321
setMethod("arrange",
1319-
signature(x = "DataFrame", col = "characterOrColumn"),
1322+
signature(x = "DataFrame", col = "Column"),
13201323
function(x, col, ...) {
1321-
if (class(col) == "character") {
1322-
sdf <- callJMethod(x@sdf, "sort", col, list(...))
1323-
} else if (class(col) == "Column") {
13241324
jcols <- lapply(list(col, ...), function(c) {
13251325
c@jc
13261326
})
1327-
sdf <- callJMethod(x@sdf, "sort", jcols)
1328-
}
1327+
1328+
sdf <- callJMethod(x@sdf, "sort", jcols)
13291329
dataFrame(sdf)
13301330
})
13311331

1332+
#' @rdname arrange
1333+
#' @export
1334+
setMethod("arrange",
1335+
signature(x = "DataFrame", col = "character"),
1336+
function(x, col, ..., decreasing = FALSE) {
1337+
1338+
# all sorting columns
1339+
by <- list(col, ...)
1340+
1341+
if (length(decreasing) == 1) {
1342+
# in case only 1 boolean argument - decreasing value is specified,
1343+
# it will be used for all columns
1344+
decreasing <- rep(decreasing, length(by))
1345+
} else if (length(decreasing) != length(by)) {
1346+
stop("Arguments 'col' and 'decreasing' must have the same length")
1347+
}
1348+
1349+
# builds a list of columns of type Column
1350+
# example: [[1]] Column Species ASC
1351+
# [[2]] Column Petal_Length DESC
1352+
jcols <- lapply(seq_len(length(decreasing)), function(i){
1353+
if (decreasing[[i]]) {
1354+
desc(getColumn(x, by[[i]]))
1355+
} else {
1356+
asc(getColumn(x, by[[i]]))
1357+
}
1358+
})
1359+
1360+
do.call("arrange", c(x, jcols))
1361+
})
1362+
13321363
#' @rdname arrange
13331364
#' @name orderby
13341365
setMethod("orderBy",
@@ -1383,9 +1414,10 @@ setMethod("where",
13831414
#' @param x A Spark DataFrame
13841415
#' @param y A Spark DataFrame
13851416
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
1386-
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
1417+
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
13871418
#' @param joinType The type of join to perform. The following join types are available:
1388-
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
1419+
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
1420+
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
13891421
#' @return A DataFrame containing the result of the join operation.
13901422
#' @rdname join
13911423
#' @name join
@@ -1410,11 +1442,15 @@ setMethod("join",
14101442
if (is.null(joinType)) {
14111443
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
14121444
} else {
1413-
if (joinType %in% c("inner", "outer", "left_outer", "right_outer", "semijoin")) {
1445+
if (joinType %in% c("inner", "outer", "full", "fullouter",
1446+
"leftouter", "left_outer", "left",
1447+
"rightouter", "right_outer", "right", "leftsemi")) {
1448+
joinType <- gsub("_", "", joinType)
14141449
sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
14151450
} else {
14161451
stop("joinType must be one of the following types: ",
1417-
"'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'")
1452+
"'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left_outer', 'left',
1453+
'rightouter', 'right_outer', 'right', 'leftsemi'")
14181454
}
14191455
}
14201456
}
@@ -1795,17 +1831,15 @@ setMethod("fillna",
17951831
if (length(colNames) == 0 || !all(colNames != "")) {
17961832
stop("value should be an a named list with each name being a column name.")
17971833
}
1798-
1799-
# Convert to the named list to an environment to be passed to JVM
1800-
valueMap <- new.env()
1801-
for (col in colNames) {
1802-
# Check each item in the named list is of valid type
1803-
v <- value[[col]]
1834+
# Check each item in the named list is of valid type
1835+
lapply(value, function(v) {
18041836
if (!(class(v) %in% c("integer", "numeric", "character"))) {
18051837
stop("Each item in value should be an integer, numeric or charactor.")
18061838
}
1807-
valueMap[[col]] <- v
1808-
}
1839+
})
1840+
1841+
# Convert to the named list to an environment to be passed to JVM
1842+
valueMap <- convertNamedListToEnv(value)
18091843

18101844
# When value is a named list, caller is expected not to pass in cols
18111845
if (!is.null(cols)) {
@@ -1828,36 +1862,6 @@ setMethod("fillna",
18281862
dataFrame(sdf)
18291863
})
18301864

1831-
#' crosstab
1832-
#'
1833-
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
1834-
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
1835-
#' non-zero pair frequencies will be returned.
1836-
#'
1837-
#' @param col1 name of the first column. Distinct items will make the first item of each row.
1838-
#' @param col2 name of the second column. Distinct items will make the column names of the output.
1839-
#' @return a local R data.frame representing the contingency table. The first column of each row
1840-
#' will be the distinct values of `col1` and the column names will be the distinct values
1841-
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
1842-
#' occurrences will have zero as their counts.
1843-
#'
1844-
#' @rdname statfunctions
1845-
#' @name crosstab
1846-
#' @export
1847-
#' @examples
1848-
#' \dontrun{
1849-
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
1850-
#' ct = crosstab(df, "title", "gender")
1851-
#' }
1852-
setMethod("crosstab",
1853-
signature(x = "DataFrame", col1 = "character", col2 = "character"),
1854-
function(x, col1, col2) {
1855-
statFunctions <- callJMethod(x@sdf, "stat")
1856-
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
1857-
collect(dataFrame(sct))
1858-
})
1859-
1860-
18611865
#' This function downloads the contents of a DataFrame into an R's data.frame.
18621866
#' Since data.frames are held in memory, ensure that you have enough memory
18631867
#' in your system to accommodate the contents.
@@ -1879,5 +1883,34 @@ setMethod("as.data.frame",
18791883
stop(paste("Unused argument(s): ", paste(list(...), collapse=", ")))
18801884
}
18811885
collect(x)
1882-
}
1883-
)
1886+
})
1887+
1888+
#' The specified DataFrame is attached to the R search path. This means that
1889+
#' the DataFrame is searched by R when evaluating a variable, so columns in
1890+
#' the DataFrame can be accessed by simply giving their names.
1891+
#'
1892+
#' @rdname attach
1893+
#' @title Attach DataFrame to R search path
1894+
#' @param what (DataFrame) The DataFrame to attach
1895+
#' @param pos (integer) Specify position in search() where to attach.
1896+
#' @param name (character) Name to use for the attached DataFrame. Names
1897+
#' starting with package: are reserved for library.
1898+
#' @param warn.conflicts (logical) If TRUE, warnings are printed about conflicts
1899+
#' from attaching the database, unless that DataFrame contains an object
1900+
#' @examples
1901+
#' \dontrun{
1902+
#' attach(irisDf)
1903+
#' summary(Sepal_Width)
1904+
#' }
1905+
#' @seealso \link{detach}
1906+
setMethod("attach",
1907+
signature(what = "DataFrame"),
1908+
function(what, pos = 2, name = deparse(substitute(what)), warn.conflicts = TRUE) {
1909+
cols <- columns(what)
1910+
stopifnot(length(cols) > 0)
1911+
newEnv <- new.env()
1912+
for (i in 1:length(cols)) {
1913+
assign(x = cols[i], value = what[, cols[i]], envir = newEnv)
1914+
}
1915+
attach(newEnv, pos = pos, name = name, warn.conflicts = warn.conflicts)
1916+
})

R/pkg/R/SQLContext.R

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ infer_type <- function(x) {
3232
numeric = "double",
3333
raw = "binary",
3434
list = "array",
35+
struct = "struct",
3536
environment = "map",
3637
Date = "date",
3738
POSIXlt = "timestamp",
@@ -44,39 +45,42 @@ infer_type <- function(x) {
4445
paste0("map<string,", infer_type(get(key, x)), ">")
4546
} else if (type == "array") {
4647
stopifnot(length(x) > 0)
48+
49+
paste0("array<", infer_type(x[[1]]), ">")
50+
} else if (type == "struct") {
51+
stopifnot(length(x) > 0)
4752
names <- names(x)
48-
if (is.null(names)) {
49-
paste0("array<", infer_type(x[[1]]), ">")
50-
} else {
51-
# StructType
52-
types <- lapply(x, infer_type)
53-
fields <- lapply(1:length(x), function(i) {
54-
structField(names[[i]], types[[i]], TRUE)
55-
})
56-
do.call(structType, fields)
57-
}
53+
stopifnot(!is.null(names))
54+
55+
type <- lapply(seq_along(x), function(i) {
56+
paste0(names[[i]], ":", infer_type(x[[i]]), ",")
57+
})
58+
type <- Reduce(paste0, type)
59+
type <- paste0("struct<", substr(type, 1, nchar(type) - 1), ">")
5860
} else if (length(x) > 1) {
5961
paste0("array<", infer_type(x[[1]]), ">")
6062
} else {
6163
type
6264
}
6365
}
6466

65-
#' Create a DataFrame from an RDD
67+
#' Create a DataFrame
6668
#'
67-
#' Converts an RDD to a DataFrame by infer the types.
69+
#' Converts R data.frame or list into DataFrame.
6870
#'
6971
#' @param sqlContext A SQLContext
7072
#' @param data An RDD or list or data.frame
7173
#' @param schema a list of column names or named list (StructType), optional
7274
#' @return an DataFrame
75+
#' @rdname createDataFrame
7376
#' @export
7477
#' @examples
7578
#'\dontrun{
7679
#' sc <- sparkR.init()
7780
#' sqlContext <- sparkRSQL.init(sc)
78-
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
79-
#' df <- createDataFrame(sqlContext, rdd)
81+
#' df1 <- as.DataFrame(sqlContext, iris)
82+
#' df2 <- as.DataFrame(sqlContext, list(3,4,5,6))
83+
#' df3 <- createDataFrame(sqlContext, iris)
8084
#' }
8185

8286
# TODO(davies): support sampling and infer type from NA
@@ -149,6 +153,13 @@ createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0
149153
dataFrame(sdf)
150154
}
151155

156+
#' @rdname createDataFrame
157+
#' @aliases createDataFrame
158+
#' @export
159+
as.DataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
160+
createDataFrame(sqlContext, data, schema, samplingRatio)
161+
}
162+
152163
# toDF
153164
#
154165
# Converts an RDD to a DataFrame by infer the types.

R/pkg/R/column.R

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,11 @@ setMethod("initialize", "Column", function(.Object, jc) {
3636
.Object
3737
})
3838

39-
column <- function(jc) {
40-
new("Column", jc)
41-
}
42-
43-
col <- function(x) {
44-
column(callJStatic("org.apache.spark.sql.functions", "col", x))
45-
}
39+
setMethod("column",
40+
signature(x = "jobj"),
41+
function(x) {
42+
new("Column", x)
43+
})
4644

4745
#' @rdname show
4846
#' @name show

R/pkg/R/deserialize.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ readTypedObject <- function(con, type) {
5151
"a" = readArray(con),
5252
"l" = readList(con),
5353
"e" = readEnv(con),
54+
"s" = readStruct(con),
5455
"n" = NULL,
5556
"j" = getJobj(readString(con)),
5657
stop(paste("Unsupported type for deserialization", type)))
@@ -135,6 +136,15 @@ readEnv <- function(con) {
135136
env
136137
}
137138

139+
# Read a field of StructType from DataFrame
140+
# into a named list in R whose class is "struct"
141+
readStruct <- function(con) {
142+
names <- readObject(con)
143+
fields <- readObject(con)
144+
names(fields) <- names
145+
listToStruct(fields)
146+
}
147+
138148
readRaw <- function(con) {
139149
dataLen <- readInt(con)
140150
readBin(con, raw(), as.integer(dataLen), endian = "big")

0 commit comments

Comments
 (0)