Skip to content

Commit 403f966

Browse files
committed
Fix new merge conflict
Merge branch 'master' of https://github.com/apache/spark into sql-pivot Conflicts: sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
2 parents f2827ea + f21ef8d commit 403f966

File tree

321 files changed

+6659
-3315
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

321 files changed

+6659
-3315
lines changed

R/pkg/NAMESPACE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ exportMethods("%in%",
119119
"count",
120120
"countDistinct",
121121
"crc32",
122+
"cumeDist",
122123
"date_add",
123124
"date_format",
124125
"date_sub",
@@ -150,8 +151,10 @@ exportMethods("%in%",
150151
"isNaN",
151152
"isNotNull",
152153
"isNull",
154+
"lag",
153155
"last",
154156
"last_day",
157+
"lead",
155158
"least",
156159
"length",
157160
"levenshtein",
@@ -177,6 +180,7 @@ exportMethods("%in%",
177180
"nanvl",
178181
"negate",
179182
"next_day",
183+
"ntile",
180184
"otherwise",
181185
"pmod",
182186
"quarter",

R/pkg/R/DataFrame.R

Lines changed: 149 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,15 +1457,147 @@ setMethod("join",
14571457
dataFrame(sdf)
14581458
})
14591459

1460-
#' @rdname merge
1460+
#'
14611461
#' @name merge
14621462
#' @aliases join
1463+
#' @title Merges two data frames
1464+
#' @param x the first data frame to be joined
1465+
#' @param y the second data frame to be joined
1466+
#' @param by a character vector specifying the join columns. If by is not
1467+
#' specified, the common column names in \code{x} and \code{y} will be used.
1468+
#' @param by.x a character vector specifying the joining columns for x.
1469+
#' @param by.y a character vector specifying the joining columns for y.
1470+
#' @param all.x a boolean value indicating whether all the rows in x should
1471+
#' be including in the join
1472+
#' @param all.y a boolean value indicating whether all the rows in y should
1473+
#' be including in the join
1474+
#' @param sort a logical argument indicating whether the resulting columns should be sorted
1475+
#' @details If all.x and all.y are set to FALSE, a natural join will be returned. If
1476+
#' all.x is set to TRUE and all.y is set to FALSE, a left outer join will
1477+
#' be returned. If all.x is set to FALSE and all.y is set to TRUE, a right
1478+
#' outer join will be returned. If all.x and all.y are set to TRUE, a full
1479+
#' outer join will be returned.
1480+
#' @rdname merge
1481+
#' @export
1482+
#' @examples
1483+
#'\dontrun{
1484+
#' sc <- sparkR.init()
1485+
#' sqlContext <- sparkRSQL.init(sc)
1486+
#' df1 <- jsonFile(sqlContext, path)
1487+
#' df2 <- jsonFile(sqlContext, path2)
1488+
#' merge(df1, df2) # Performs a Cartesian
1489+
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
1490+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
1491+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
1492+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
1493+
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
1494+
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
1495+
#' }
14631496
setMethod("merge",
14641497
signature(x = "DataFrame", y = "DataFrame"),
1465-
function(x, y, joinExpr = NULL, joinType = NULL, ...) {
1466-
join(x, y, joinExpr, joinType)
1467-
})
1498+
function(x, y, by = intersect(names(x), names(y)), by.x = by, by.y = by,
1499+
all = FALSE, all.x = all, all.y = all,
1500+
sort = TRUE, suffixes = c("_x","_y"), ... ) {
1501+
1502+
if (length(suffixes) != 2) {
1503+
stop("suffixes must have length 2")
1504+
}
1505+
1506+
# join type is identified based on the values of all, all.x and all.y
1507+
# default join type is inner, according to R it should be natural but since it
1508+
# is not supported in spark inner join is used
1509+
joinType <- "inner"
1510+
if (all || (all.x && all.y)) {
1511+
joinType <- "outer"
1512+
} else if (all.x) {
1513+
joinType <- "left_outer"
1514+
} else if (all.y) {
1515+
joinType <- "right_outer"
1516+
}
14681517

1518+
# join expression is based on by.x, by.y if both by.x and by.y are not missing
1519+
# or on by, if by.x or by.y are missing or have different lengths
1520+
if (length(by.x) > 0 && length(by.x) == length(by.y)) {
1521+
joinX <- by.x
1522+
joinY <- by.y
1523+
} else if (length(by) > 0) {
1524+
# if join columns have the same name for both dataframes,
1525+
# they are used in join expression
1526+
joinX <- by
1527+
joinY <- by
1528+
} else {
1529+
# if by or both by.x and by.y have length 0, use Cartesian Product
1530+
joinRes <- join(x, y)
1531+
return (joinRes)
1532+
}
1533+
1534+
# sets alias for making colnames unique in dataframes 'x' and 'y'
1535+
colsX <- generateAliasesForIntersectedCols(x, by, suffixes[1])
1536+
colsY <- generateAliasesForIntersectedCols(y, by, suffixes[2])
1537+
1538+
# selects columns with their aliases from dataframes
1539+
# in case same column names are present in both data frames
1540+
xsel <- select(x, colsX)
1541+
ysel <- select(y, colsY)
1542+
1543+
# generates join conditions and adds them into a list
1544+
# it also considers alias names of the columns while generating join conditions
1545+
joinColumns <- lapply(seq_len(length(joinX)), function(i) {
1546+
colX <- joinX[[i]]
1547+
colY <- joinY[[i]]
1548+
1549+
if (colX %in% by) {
1550+
colX <- paste(colX, suffixes[1], sep = "")
1551+
}
1552+
if (colY %in% by) {
1553+
colY <- paste(colY, suffixes[2], sep = "")
1554+
}
1555+
1556+
colX <- getColumn(xsel, colX)
1557+
colY <- getColumn(ysel, colY)
1558+
1559+
colX == colY
1560+
})
1561+
1562+
# concatenates join columns with '&' and executes join
1563+
joinExpr <- Reduce("&", joinColumns)
1564+
joinRes <- join(xsel, ysel, joinExpr, joinType)
1565+
1566+
# sorts the result by 'by' columns if sort = TRUE
1567+
if (sort && length(by) > 0) {
1568+
colNameWithSuffix <- paste(by, suffixes[2], sep = "")
1569+
joinRes <- do.call("arrange", c(joinRes, colNameWithSuffix, decreasing = FALSE))
1570+
}
1571+
1572+
joinRes
1573+
})
1574+
1575+
#'
1576+
#' Creates a list of columns by replacing the intersected ones with aliases.
1577+
#' The name of the alias column is formed by concatanating the original column name and a suffix.
1578+
#'
1579+
#' @param x a DataFrame on which the
1580+
#' @param intersectedColNames a list of intersected column names
1581+
#' @param suffix a suffix for the column name
1582+
#' @return list of columns
1583+
#'
1584+
generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
1585+
allColNames <- names(x)
1586+
# sets alias for making colnames unique in dataframe 'x'
1587+
cols <- lapply(allColNames, function(colName) {
1588+
col <- getColumn(x, colName)
1589+
if (colName %in% intersectedColNames) {
1590+
newJoin <- paste(colName, suffix, sep = "")
1591+
if (newJoin %in% allColNames){
1592+
stop ("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
1593+
"Please use different suffixes for the intersected columns.")
1594+
}
1595+
col <- alias(col, newJoin)
1596+
}
1597+
col
1598+
})
1599+
cols
1600+
}
14691601

14701602
#' UnionAll
14711603
#'
@@ -1572,18 +1704,17 @@ setMethod("except",
15721704
#' spark.sql.sources.default will be used.
15731705
#'
15741706
#' Additionally, mode is used to specify the behavior of the save operation when
1575-
#' data already exists in the data source. There are four modes:
1576-
#' append: Contents of this DataFrame are expected to be appended to existing data.
1577-
#' overwrite: Existing data is expected to be overwritten by the contents of
1578-
# this DataFrame.
1579-
#' error: An exception is expected to be thrown.
1707+
#' data already exists in the data source. There are four modes: \cr
1708+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
1709+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
1710+
#' error: An exception is expected to be thrown. \cr
15801711
#' ignore: The save operation is expected to not save the contents of the DataFrame
1581-
# and to not change the existing data.
1712+
#' and to not change the existing data. \cr
15821713
#'
15831714
#' @param df A SparkSQL DataFrame
15841715
#' @param path A name for the table
15851716
#' @param source A name for external data source
1586-
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
1717+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
15871718
#'
15881719
#' @rdname write.df
15891720
#' @name write.df
@@ -1596,6 +1727,7 @@ setMethod("except",
15961727
#' path <- "path/to/file.json"
15971728
#' df <- jsonFile(sqlContext, path)
15981729
#' write.df(df, "myfile", "parquet", "overwrite")
1730+
#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
15991731
#' }
16001732
setMethod("write.df",
16011733
signature(df = "DataFrame", path = "character"),
@@ -1637,18 +1769,17 @@ setMethod("saveDF",
16371769
#' spark.sql.sources.default will be used.
16381770
#'
16391771
#' Additionally, mode is used to specify the behavior of the save operation when
1640-
#' data already exists in the data source. There are four modes:
1641-
#' append: Contents of this DataFrame are expected to be appended to existing data.
1642-
#' overwrite: Existing data is expected to be overwritten by the contents of
1643-
# this DataFrame.
1644-
#' error: An exception is expected to be thrown.
1772+
#' data already exists in the data source. There are four modes: \cr
1773+
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
1774+
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
1775+
#' error: An exception is expected to be thrown. \cr
16451776
#' ignore: The save operation is expected to not save the contents of the DataFrame
1646-
# and to not change the existing data.
1777+
#' and to not change the existing data. \cr
16471778
#'
16481779
#' @param df A SparkSQL DataFrame
16491780
#' @param tableName A name for the table
16501781
#' @param source A name for external data source
1651-
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
1782+
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
16521783
#'
16531784
#' @rdname saveAsTable
16541785
#' @name saveAsTable

R/pkg/R/SQLContext.R

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -452,14 +452,21 @@ dropTempTable <- function(sqlContext, tableName) {
452452
#'
453453
#' @param sqlContext SQLContext to use
454454
#' @param path The path of files to load
455-
#' @param source the name of external data source
455+
#' @param source The name of external data source
456+
#' @param schema The data schema defined in structType
456457
#' @return DataFrame
458+
#' @rdname read.df
459+
#' @name read.df
457460
#' @export
458461
#' @examples
459462
#'\dontrun{
460463
#' sc <- sparkR.init()
461464
#' sqlContext <- sparkRSQL.init(sc)
462-
#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
465+
#' df1 <- read.df(sqlContext, "path/to/file.json", source = "json")
466+
#' schema <- structType(structField("name", "string"),
467+
#' structField("info", "map<string,double>"))
468+
#' df2 <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
469+
#' df3 <- loadDF(sqlContext, "data/test_table", "parquet", mergeSchema = "true")
463470
#' }
464471

465472
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
@@ -482,9 +489,8 @@ read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...)
482489
dataFrame(sdf)
483490
}
484491

485-
#' @aliases loadDF
486-
#' @export
487-
492+
#' @rdname read.df
493+
#' @name loadDF
488494
loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
489495
read.df(sqlContext, path, source, schema, ...)
490496
}

R/pkg/R/functions.R

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2013,3 +2013,101 @@ setMethod("ifelse",
20132013
"otherwise", no)
20142014
column(jc)
20152015
})
2016+
2017+
###################### Window functions######################
2018+
2019+
#' cumeDist
2020+
#'
2021+
#' Window function: returns the cumulative distribution of values within a window partition,
2022+
#' i.e. the fraction of rows that are below the current row.
2023+
#'
2024+
#' N = total number of rows in the partition
2025+
#' cumeDist(x) = number of values before (and including) x / N
2026+
#'
2027+
#' This is equivalent to the CUME_DIST function in SQL.
2028+
#'
2029+
#' @rdname cumeDist
2030+
#' @name cumeDist
2031+
#' @family window_funcs
2032+
#' @export
2033+
#' @examples \dontrun{cumeDist()}
2034+
setMethod("cumeDist",
2035+
signature(x = "missing"),
2036+
function() {
2037+
jc <- callJStatic("org.apache.spark.sql.functions", "cumeDist")
2038+
column(jc)
2039+
})
2040+
2041+
#' lag
2042+
#'
2043+
#' Window function: returns the value that is `offset` rows before the current row, and
2044+
#' `defaultValue` if there is less than `offset` rows before the current row. For example,
2045+
#' an `offset` of one will return the previous row at any given point in the window partition.
2046+
#'
2047+
#' This is equivalent to the LAG function in SQL.
2048+
#'
2049+
#' @rdname lag
2050+
#' @name lag
2051+
#' @family window_funcs
2052+
#' @export
2053+
#' @examples \dontrun{lag(df$c)}
2054+
setMethod("lag",
2055+
signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
2056+
function(x, offset, defaultValue = NULL) {
2057+
col <- if (class(x) == "Column") {
2058+
x@jc
2059+
} else {
2060+
x
2061+
}
2062+
2063+
jc <- callJStatic("org.apache.spark.sql.functions",
2064+
"lag", col, as.integer(offset), defaultValue)
2065+
column(jc)
2066+
})
2067+
2068+
#' lead
2069+
#'
2070+
#' Window function: returns the value that is `offset` rows after the current row, and
2071+
#' `null` if there is less than `offset` rows after the current row. For example,
2072+
#' an `offset` of one will return the next row at any given point in the window partition.
2073+
#'
2074+
#' This is equivalent to the LEAD function in SQL.
2075+
#'
2076+
#' @rdname lead
2077+
#' @name lead
2078+
#' @family window_funcs
2079+
#' @export
2080+
#' @examples \dontrun{lead(df$c)}
2081+
setMethod("lead",
2082+
signature(x = "characterOrColumn", offset = "numeric", defaultValue = "ANY"),
2083+
function(x, offset, defaultValue = NULL) {
2084+
col <- if (class(x) == "Column") {
2085+
x@jc
2086+
} else {
2087+
x
2088+
}
2089+
2090+
jc <- callJStatic("org.apache.spark.sql.functions",
2091+
"lead", col, as.integer(offset), defaultValue)
2092+
column(jc)
2093+
})
2094+
2095+
#' ntile
2096+
#'
2097+
#' Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
2098+
#' partition. Fow example, if `n` is 4, the first quarter of the rows will get value 1, the second
2099+
#' quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
2100+
#'
2101+
#' This is equivalent to the NTILE function in SQL.
2102+
#'
2103+
#' @rdname ntile
2104+
#' @name ntile
2105+
#' @family window_funcs
2106+
#' @export
2107+
#' @examples \dontrun{ntile(1)}
2108+
setMethod("ntile",
2109+
signature(x = "numeric"),
2110+
function(x) {
2111+
jc <- callJStatic("org.apache.spark.sql.functions", "ntile", as.integer(x))
2112+
column(jc)
2113+
})

0 commit comments

Comments
 (0)