Skip to content

Commit 5b6c70e

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rdd-callsite
2 parents 61baf10 + 643c49c commit 5b6c70e

File tree

317 files changed

+8046
-4600
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

317 files changed

+8046
-4600
lines changed

R/pkg/NAMESPACE

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,14 @@ exportMethods("%in%",
119119
"count",
120120
"countDistinct",
121121
"crc32",
122+
"cumeDist",
122123
"date_add",
123124
"date_format",
124125
"date_sub",
125126
"datediff",
126127
"dayofmonth",
127128
"dayofyear",
129+
"denseRank",
128130
"desc",
129131
"endsWith",
130132
"exp",
@@ -150,8 +152,10 @@ exportMethods("%in%",
150152
"isNaN",
151153
"isNotNull",
152154
"isNull",
155+
"lag",
153156
"last",
154157
"last_day",
158+
"lead",
155159
"least",
156160
"length",
157161
"levenshtein",
@@ -177,17 +181,21 @@ exportMethods("%in%",
177181
"nanvl",
178182
"negate",
179183
"next_day",
184+
"ntile",
180185
"otherwise",
186+
"percentRank",
181187
"pmod",
182188
"quarter",
183189
"rand",
184190
"randn",
191+
"rank",
185192
"regexp_extract",
186193
"regexp_replace",
187194
"reverse",
188195
"rint",
189196
"rlike",
190197
"round",
198+
"rowNumber",
191199
"rpad",
192200
"rtrim",
193201
"second",

R/pkg/R/DataFrame.R

Lines changed: 150 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ setMethod("cache",
357357
#'
358358
#' Persist this DataFrame with the specified storage level. For details of the
359359
#' supported storage levels, refer to
360-
#' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.
360+
#' \url{http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence}.
361361
#'
362362
#' @param x The DataFrame to persist
363363
#' @rdname persist
@@ -1457,15 +1457,147 @@ setMethod("join",
14571457
dataFrame(sdf)
14581458
})
14591459

1460-
#' @rdname merge
1460+
#'
14611461
#' @name merge
14621462
#' @aliases join
1463+
#' @title Merges two data frames
1464+
#' @param x the first data frame to be joined
1465+
#' @param y the second data frame to be joined
1466+
#' @param by a character vector specifying the join columns. If by is not
1467+
#' specified, the common column names in \code{x} and \code{y} will be used.
1468+
#' @param by.x a character vector specifying the joining columns for x.
1469+
#' @param by.y a character vector specifying the joining columns for y.
1470+
#' @param all.x a boolean value indicating whether all the rows in x should
1471+
#' be including in the join
1472+
#' @param all.y a boolean value indicating whether all the rows in y should
1473+
#' be including in the join
1474+
#' @param sort a logical argument indicating whether the resulting columns should be sorted
1475+
#' @details If all.x and all.y are set to FALSE, a natural join will be returned. If
1476+
#' all.x is set to TRUE and all.y is set to FALSE, a left outer join will
1477+
#' be returned. If all.x is set to FALSE and all.y is set to TRUE, a right
1478+
#' outer join will be returned. If all.x and all.y are set to TRUE, a full
1479+
#' outer join will be returned.
1480+
#' @rdname merge
1481+
#' @export
1482+
#' @examples
1483+
#'\dontrun{
1484+
#' sc <- sparkR.init()
1485+
#' sqlContext <- sparkRSQL.init(sc)
1486+
#' df1 <- jsonFile(sqlContext, path)
1487+
#' df2 <- jsonFile(sqlContext, path2)
1488+
#' merge(df1, df2) # Performs a Cartesian
1489+
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
1490+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
1491+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
1492+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
1493+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
1494+
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
1495+
#' }
14631496
setMethod("merge",
14641497
signature(x = "DataFrame", y = "DataFrame"),
1465-
function(x, y, joinExpr = NULL, joinType = NULL, ...) {
1466-
join(x, y, joinExpr, joinType)
1467-
})
1498+
function(x, y, by = intersect(names(x), names(y)), by.x = by, by.y = by,
1499+
all = FALSE, all.x = all, all.y = all,
1500+
sort = TRUE, suffixes = c("_x","_y"), ... ) {
1501+
1502+
if (length(suffixes) != 2) {
1503+
stop("suffixes must have length 2")
1504+
}
1505+
1506+
# join type is identified based on the values of all, all.x and all.y
1507+
# default join type is inner, according to R it should be natural but since it
1508+
# is not supported in spark inner join is used
1509+
joinType <- "inner"
1510+
if (all || (all.x && all.y)) {
1511+
joinType <- "outer"
1512+
} else if (all.x) {
1513+
joinType <- "left_outer"
1514+
} else if (all.y) {
1515+
joinType <- "right_outer"
1516+
}
14681517

1518+
# join expression is based on by.x, by.y if both by.x and by.y are not missing
1519+
# or on by, if by.x or by.y are missing or have different lengths
1520+
if (length(by.x) > 0 && length(by.x) == length(by.y)) {
1521+
joinX <- by.x
1522+
joinY <- by.y
1523+
} else if (length(by) > 0) {
1524+
# if join columns have the same name for both dataframes,
1525+
# they are used in join expression
1526+
joinX <- by
1527+
joinY <- by
1528+
} else {
1529+
# if by or both by.x and by.y have length 0, use Cartesian Product
1530+
joinRes <- join(x, y)
1531+
return (joinRes)
1532+
}
1533+
1534+
# sets alias for making colnames unique in dataframes 'x' and 'y'
1535+
colsX <- generateAliasesForIntersectedCols(x, by, suffixes[1])
1536+
colsY <- generateAliasesForIntersectedCols(y, by, suffixes[2])
1537+
1538+
# selects columns with their aliases from dataframes
1539+
# in case same column names are present in both data frames
1540+
xsel <- select(x, colsX)
1541+
ysel <- select(y, colsY)
1542+
1543+
# generates join conditions and adds them into a list
1544+
# it also considers alias names of the columns while generating join conditions
1545+
joinColumns <- lapply(seq_len(length(joinX)), function(i) {
1546+
colX <- joinX[[i]]
1547+
colY <- joinY[[i]]
1548+
1549+
if (colX %in% by) {
1550+
colX <- paste(colX, suffixes[1], sep = "")
1551+
}
1552+
if (colY %in% by) {
1553+
colY <- paste(colY, suffixes[2], sep = "")
1554+
}
1555+
1556+
colX <- getColumn(xsel, colX)
1557+
colY <- getColumn(ysel, colY)
1558+
1559+
colX == colY
1560+
})
1561+
1562+
# concatenates join columns with '&' and executes join
1563+
joinExpr <- Reduce("&", joinColumns)
1564+
joinRes <- join(xsel, ysel, joinExpr, joinType)
1565+
1566+
# sorts the result by 'by' columns if sort = TRUE
1567+
if (sort && length(by) > 0) {
1568+
colNameWithSuffix <- paste(by, suffixes[2], sep = "")
1569+
joinRes <- do.call("arrange", c(joinRes, colNameWithSuffix, decreasing = FALSE))
1570+
}
1571+
1572+
joinRes
1573+
})
1574+
1575+
#'
1576+
#' Creates a list of columns by replacing the intersected ones with aliases.
1577+
#' The name of the alias column is formed by concatanating the original column name and a suffix.
1578+
#'
1579+
#' @param x a DataFrame on which the
1580+
#' @param intersectedColNames a list of intersected column names
1581+
#' @param suffix a suffix for the column name
1582+
#' @return list of columns
1583+
#'
1584+
generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
1585+
allColNames <- names(x)
1586+
# sets alias for making colnames unique in dataframe 'x'
1587+
cols <- lapply(allColNames, function(colName) {
1588+
col <- getColumn(x, colName)
1589+
if (colName %in% intersectedColNames) {
1590+
newJoin <- paste(colName, suffix, sep = "")
1591+
if (newJoin %in% allColNames){
1592+
stop ("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
1593+
"Please use different suffixes for the intersected columns.")
1594+
}
1595+
col <- alias(col, newJoin)
1596+
}
1597+
col
1598+
})
1599+
cols
1600+
}
14691601

14701602
#' UnionAll
14711603
#'
@@ -1572,18 +1704,17 @@ setMethod("except",
15721704
#' spark.sql.sources.default will be used.
15731705
#'
15741706
#' Additionally, mode is used to specify the behavior of the save operation when
1575-
#' data already exists in the data source. There are four modes:
1576-
#' append: Contents of this DataFrame are expected to be appended to existing data.
1577-
#' overwrite: Existing data is expected to be overwritten by the contents of
1578-
# this DataFrame.
1579-
#' error: An exception is expected to be thrown.
1707+
#' data already exists in the data source. There are four modes: \cr
1708+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
1709+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
1710+
#' error: An exception is expected to be thrown. \cr
15801711
#' ignore: The save operation is expected to not save the contents of the DataFrame
1581-
# and to not change the existing data.
1712+
#' and to not change the existing data. \cr
15821713
#'
15831714
#' @param df A SparkSQL DataFrame
15841715
#' @param path A name for the table
15851716
#' @param source A name for external data source
1586-
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
1717+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
15871718
#'
15881719
#' @rdname write.df
15891720
#' @name write.df
@@ -1596,6 +1727,7 @@ setMethod("except",
15961727
#' path <- "path/to/file.json"
15971728
#' df <- jsonFile(sqlContext, path)
15981729
#' write.df(df, "myfile", "parquet", "overwrite")
1730+
#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
15991731
#' }
16001732
setMethod("write.df",
16011733
signature(df = "DataFrame", path = "character"),
@@ -1637,18 +1769,17 @@ setMethod("saveDF",
16371769
#' spark.sql.sources.default will be used.
16381770
#'
16391771
#' Additionally, mode is used to specify the behavior of the save operation when
1640-
#' data already exists in the data source. There are four modes:
1641-
#' append: Contents of this DataFrame are expected to be appended to existing data.
1642-
#' overwrite: Existing data is expected to be overwritten by the contents of
1643-
# this DataFrame.
1644-
#' error: An exception is expected to be thrown.
1772+
#' data already exists in the data source. There are four modes: \cr
1773+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
1774+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
1775+
#' error: An exception is expected to be thrown. \cr
16451776
#' ignore: The save operation is expected to not save the contents of the DataFrame
1646-
# and to not change the existing data.
1777+
#' and to not change the existing data. \cr
16471778
#'
16481779
#' @param df A SparkSQL DataFrame
16491780
#' @param tableName A name for the table
16501781
#' @param source A name for external data source
1651-
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
1782+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
16521783
#'
16531784
#' @rdname saveAsTable
16541785
#' @name saveAsTable

R/pkg/R/SQLContext.R

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -452,14 +452,21 @@ dropTempTable <- function(sqlContext, tableName) {
452452
#'
453453
#' @param sqlContext SQLContext to use
454454
#' @param path The path of files to load
455-
#' @param source the name of external data source
455+
#' @param source The name of external data source
456+
#' @param schema The data schema defined in structType
456457
#' @return DataFrame
458+
#' @rdname read.df
459+
#' @name read.df
457460
#' @export
458461
#' @examples
459462
#'\dontrun{
460463
#' sc <- sparkR.init()
461464
#' sqlContext <- sparkRSQL.init(sc)
462-
#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
465+
#' df1 <- read.df(sqlContext, "path/to/file.json", source = "json")
466+
#' schema <- structType(structField("name", "string"),
467+
#' structField("info", "map<string,double>"))
468+
#' df2 <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
469+
#' df3 <- loadDF(sqlContext, "data/test_table", "parquet", mergeSchema = "true")
463470
#' }
464471

465472
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
@@ -482,9 +489,8 @@ read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...)
482489
dataFrame(sdf)
483490
}
484491

485-
#' @aliases loadDF
486-
#' @export
487-
492+
#' @rdname read.df
493+
#' @name loadDF
488494
loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
489495
read.df(sqlContext, path, source, schema, ...)
490496
}

0 commit comments

Comments
 (0)