Skip to content

Commit c7aa4aa

Browse files
Merge remote-tracking branch 'upstream/master' into empstring_fix_spark-15125
2 parents b128fbb + c1f344f commit c7aa4aa

File tree

119 files changed

+1873
-964
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

119 files changed

+1873
-964
lines changed

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
263263
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
264264
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
265265
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
266-
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.3 - http://py4j.sourceforge.net/)
266+
(The New BSD License) Py4J (net.sf.py4j:py4j:0.10.4 - http://py4j.sourceforge.net/)
267267
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
268268
(BSD licence) sbt and sbt-launch-lib.bash
269269
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)

R/pkg/NAMESPACE

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
importFrom("methods", "setGeneric", "setMethod", "setOldClass")
44
importFrom("methods", "is", "new", "signature", "show")
55
importFrom("stats", "gaussian", "setNames")
6-
importFrom("utils", "download.file", "packageVersion", "untar")
6+
importFrom("utils", "download.file", "object.size", "packageVersion", "untar")
77

88
# Disable native libraries till we figure out how to package it
99
# See SPARKR-7839
@@ -71,6 +71,7 @@ exportMethods("arrange",
7171
"covar_samp",
7272
"covar_pop",
7373
"createOrReplaceTempView",
74+
"crossJoin",
7475
"crosstab",
7576
"dapply",
7677
"dapplyCollect",

R/pkg/R/DataFrame.R

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2271,12 +2271,13 @@ setMethod("dropDuplicates",
22712271

22722272
#' Join
22732273
#'
2274-
#' Join two SparkDataFrames based on the given join expression.
2274+
#' Joins two SparkDataFrames based on the given join expression.
22752275
#'
22762276
#' @param x A SparkDataFrame
22772277
#' @param y A SparkDataFrame
22782278
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
2279-
#' Column expression. If joinExpr is omitted, join() will perform a Cartesian join
2279+
#' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
2280+
#' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
22802281
#' @param joinType The type of join to perform. The following join types are available:
22812282
#' 'inner', 'outer', 'full', 'fullouter', leftouter', 'left_outer', 'left',
22822283
#' 'right_outer', 'rightouter', 'right', and 'leftsemi'. The default joinType is "inner".
@@ -2285,23 +2286,24 @@ setMethod("dropDuplicates",
22852286
#' @aliases join,SparkDataFrame,SparkDataFrame-method
22862287
#' @rdname join
22872288
#' @name join
2288-
#' @seealso \link{merge}
2289+
#' @seealso \link{merge} \link{crossJoin}
22892290
#' @export
22902291
#' @examples
22912292
#'\dontrun{
22922293
#' sparkR.session()
22932294
#' df1 <- read.json(path)
22942295
#' df2 <- read.json(path2)
2295-
#' join(df1, df2) # Performs a Cartesian
22962296
#' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
22972297
#' join(df1, df2, df1$col1 == df2$col2, "right_outer")
2298+
#' join(df1, df2) # Attempts an inner join
22982299
#' }
22992300
#' @note join since 1.4.0
23002301
setMethod("join",
23012302
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
23022303
function(x, y, joinExpr = NULL, joinType = NULL) {
23032304
if (is.null(joinExpr)) {
2304-
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
2305+
# this may not fail until the planner checks for Cartesian join later on.
2306+
sdf <- callJMethod(x@sdf, "join", y@sdf)
23052307
} else {
23062308
if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
23072309
if (is.null(joinType)) {
@@ -2322,22 +2324,52 @@ setMethod("join",
23222324
dataFrame(sdf)
23232325
})
23242326

2327+
#' CrossJoin
2328+
#'
2329+
#' Returns Cartesian Product on two SparkDataFrames.
2330+
#'
2331+
#' @param x A SparkDataFrame
2332+
#' @param y A SparkDataFrame
2333+
#' @return A SparkDataFrame containing the result of the join operation.
2334+
#' @family SparkDataFrame functions
2335+
#' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
2336+
#' @rdname crossJoin
2337+
#' @name crossJoin
2338+
#' @seealso \link{merge} \link{join}
2339+
#' @export
2340+
#' @examples
2341+
#'\dontrun{
2342+
#' sparkR.session()
2343+
#' df1 <- read.json(path)
2344+
#' df2 <- read.json(path2)
2345+
#' crossJoin(df1, df2) # Performs a Cartesian
2346+
#' }
2347+
#' @note crossJoin since 2.1.0
2348+
setMethod("crossJoin",
2349+
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2350+
function(x, y) {
2351+
sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
2352+
dataFrame(sdf)
2353+
})
2354+
23252355
#' Merges two data frames
23262356
#'
23272357
#' @name merge
2328-
#' @param x the first data frame to be joined
2329-
#' @param y the second data frame to be joined
2358+
#' @param x the first data frame to be joined.
2359+
#' @param y the second data frame to be joined.
23302360
#' @param by a character vector specifying the join columns. If by is not
23312361
#' specified, the common column names in \code{x} and \code{y} will be used.
2362+
#' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
2363+
#' Product of x and y will be returned.
23322364
#' @param by.x a character vector specifying the joining columns for x.
23332365
#' @param by.y a character vector specifying the joining columns for y.
23342366
#' @param all a boolean value setting \code{all.x} and \code{all.y}
23352367
#' if any of them are unset.
23362368
#' @param all.x a boolean value indicating whether all the rows in x should
2337-
#' be including in the join
2369+
#' be including in the join.
23382370
#' @param all.y a boolean value indicating whether all the rows in y should
2339-
#' be including in the join
2340-
#' @param sort a logical argument indicating whether the resulting columns should be sorted
2371+
#' be including in the join.
2372+
#' @param sort a logical argument indicating whether the resulting columns should be sorted.
23412373
#' @param suffixes a string vector of length 2 used to make colnames of
23422374
#' \code{x} and \code{y} unique.
23432375
#' The first element is appended to each colname of \code{x}.
@@ -2351,20 +2383,21 @@ setMethod("join",
23512383
#' @family SparkDataFrame functions
23522384
#' @aliases merge,SparkDataFrame,SparkDataFrame-method
23532385
#' @rdname merge
2354-
#' @seealso \link{join}
2386+
#' @seealso \link{join} \link{crossJoin}
23552387
#' @export
23562388
#' @examples
23572389
#'\dontrun{
23582390
#' sparkR.session()
23592391
#' df1 <- read.json(path)
23602392
#' df2 <- read.json(path2)
2361-
#' merge(df1, df2) # Performs a Cartesian
2393+
#' merge(df1, df2) # Performs an inner join by common columns
23622394
#' merge(df1, df2, by = "col1") # Performs an inner join based on expression
23632395
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
23642396
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
23652397
#' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
23662398
#' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
23672399
#' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
2400+
#' merge(df1, df2, by = NULL) # Performs a Cartesian join
23682401
#' }
23692402
#' @note merge since 1.5.0
23702403
setMethod("merge",
@@ -2401,7 +2434,7 @@ setMethod("merge",
24012434
joinY <- by
24022435
} else {
24032436
# if by or both by.x and by.y have length 0, use Cartesian Product
2404-
joinRes <- join(x, y)
2437+
joinRes <- crossJoin(x, y)
24052438
return (joinRes)
24062439
}
24072440

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,10 @@ setGeneric("createOrReplaceTempView",
468468
standardGeneric("createOrReplaceTempView")
469469
})
470470

471+
# @rdname crossJoin
472+
# @export
473+
setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })
474+
471475
#' @rdname dapply
472476
#' @export
473477
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ test_that("createDataFrame uses files for large objects", {
212212
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
213213
conf <- callJMethod(sparkSession, "conf")
214214
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
215-
df <- createDataFrame(iris)
215+
df <- suppressWarnings(createDataFrame(iris))
216216

217217
# Resetting the conf back to default value
218218
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
@@ -390,6 +390,19 @@ test_that("create DataFrame with different data types", {
390390
expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
391391
})
392392

393+
test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
394+
df <- data.frame(
395+
id = 1:2,
396+
time = c(as.POSIXlt("2016-01-10"), NA),
397+
date = c(as.Date("2016-10-01"), NA))
398+
399+
DF <- collect(createDataFrame(df))
400+
expect_true(is.na(DF$date[2]))
401+
expect_equal(DF$date[1], as.Date("2016-10-01"))
402+
expect_true(is.na(DF$time[2]))
403+
expect_equal(DF$time[1], as.POSIXlt("2016-01-10"))
404+
})
405+
393406
test_that("create DataFrame with complex types", {
394407
e <- new.env()
395408
assign("n", 3L, envir = e)
@@ -1572,7 +1585,7 @@ test_that("filter() on a DataFrame", {
15721585
#expect_true(is.ts(filter(1:100, rep(1, 3)))) # nolint
15731586
})
15741587

1575-
test_that("join() and merge() on a DataFrame", {
1588+
test_that("join(), crossJoin() and merge() on a DataFrame", {
15761589
df <- read.json(jsonPath)
15771590

15781591
mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}",
@@ -1583,7 +1596,14 @@ test_that("join() and merge() on a DataFrame", {
15831596
writeLines(mockLines2, jsonPath2)
15841597
df2 <- read.json(jsonPath2)
15851598

1586-
joined <- join(df, df2)
1599+
# inner join, not cartesian join
1600+
expect_equal(count(where(join(df, df2), df$name == df2$name)), 3)
1601+
# cartesian join
1602+
expect_error(tryCatch(count(join(df, df2)), error = function(e) { stop(e) }),
1603+
paste0(".*(org.apache.spark.sql.AnalysisException: Detected cartesian product for",
1604+
" INNER join between logical plans).*"))
1605+
1606+
joined <- crossJoin(df, df2)
15871607
expect_equal(names(joined), c("age", "name", "name", "test"))
15881608
expect_equal(count(joined), 12)
15891609
expect_equal(names(collect(joined)), c("age", "name", "name", "test"))

R/run-tests.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ rm -f $LOGFILE
2626
SPARK_TESTING=1 $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.default.name="file:///" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
2727
FAILED=$((PIPESTATUS[0]||$FAILED))
2828

29+
NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"
30+
2931
# Also run the documentation tests for CRAN
3032
CRAN_CHECK_LOG_FILE=$FWDIR/cran-check.out
3133
rm -f $CRAN_CHECK_LOG_FILE
@@ -37,10 +39,10 @@ NUM_CRAN_WARNING="$(grep -c WARNING$ $CRAN_CHECK_LOG_FILE)"
3739
NUM_CRAN_ERROR="$(grep -c ERROR$ $CRAN_CHECK_LOG_FILE)"
3840
NUM_CRAN_NOTES="$(grep -c NOTE$ $CRAN_CHECK_LOG_FILE)"
3941

40-
if [[ $FAILED != 0 ]]; then
42+
if [[ $FAILED != 0 || $NUM_TEST_WARNING != 0 ]]; then
4143
cat $LOGFILE
4244
echo -en "\033[31m" # Red
43-
echo "Had test failures; see logs."
45+
echo "Had test warnings or failures; see logs."
4446
echo -en "\033[0m" # No color
4547
exit -1
4648
else

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export PYSPARK_PYTHON
5757

5858
# Add the PySpark classes to the Python path:
5959
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
60-
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.3-src.zip:$PYTHONPATH"
60+
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
6161

6262
# Load the PySpark shell.py script when ./pyspark is used interactively:
6363
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
3030
)
3131

3232
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
33-
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.3-src.zip;%PYTHONPATH%
33+
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.4-src.zip;%PYTHONPATH%
3434

3535
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
3636
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

conf/spark-env.sh.template

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,4 @@
6363
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
6464
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
6565
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
66+
# - SPARK_NO_DAEMONIZE Run the proposed command in the foreground. It will not output a PID file.

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@
331331
<dependency>
332332
<groupId>net.sf.py4j</groupId>
333333
<artifactId>py4j</artifactId>
334-
<version>0.10.3</version>
334+
<version>0.10.4</version>
335335
</dependency>
336336
<dependency>
337337
<groupId>org.apache.spark</groupId>

0 commit comments

Comments
 (0)