Skip to content

Commit 7b19643

Browse files
Merge branch 'master' into kerberos-longrunning
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
2 parents 8a4f268 + 8a53de1 commit 7b19643

File tree

392 files changed

+9006
-2828
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

392 files changed

+9006
-2828
lines changed

R/pkg/R/RDD.R

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
8585

8686
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
8787
# This transformation is the first in its stage:
88-
.Object@func <- func
88+
.Object@func <- cleanClosure(func)
8989
.Object@prev_jrdd <- getJRDD(prev)
9090
.Object@env$prev_serializedMode <- prev@env$serializedMode
9191
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
@@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
9494
pipelinedFunc <- function(split, iterator) {
9595
func(split, prev@func(split, iterator))
9696
}
97-
.Object@func <- pipelinedFunc
97+
.Object@func <- cleanClosure(pipelinedFunc)
9898
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
9999
# Get the serialization mode of the parent RDD
100100
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
@@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
144144
return(rdd@env$jrdd_val)
145145
}
146146

147-
computeFunc <- function(split, part) {
148-
rdd@func(split, part)
149-
}
150-
151147
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
152148
connection = NULL)
153149

154150
broadcastArr <- lapply(ls(.broadcastNames),
155151
function(name) { get(name, .broadcastNames) })
156152

157-
serializedFuncArr <- serialize(computeFunc, connection = NULL)
153+
serializedFuncArr <- serialize(rdd@func, connection = NULL)
158154

159155
prev_jrdd <- rdd@prev_jrdd
160156

@@ -279,7 +275,7 @@ setMethod("unpersist",
279275
#' @examples
280276
#'\dontrun{
281277
#' sc <- sparkR.init()
282-
#' setCheckpointDir(sc, "checkpoints")
278+
#' setCheckpointDir(sc, "checkpoint")
283279
#' rdd <- parallelize(sc, 1:10, 2L)
284280
#' checkpoint(rdd)
285281
#'}
@@ -551,11 +547,7 @@ setMethod("mapPartitions",
551547
setMethod("lapplyPartitionsWithIndex",
552548
signature(X = "RDD", FUN = "function"),
553549
function(X, FUN) {
554-
FUN <- cleanClosure(FUN)
555-
closureCapturingFunc <- function(split, part) {
556-
FUN(split, part)
557-
}
558-
PipelinedRDD(X, closureCapturingFunc)
550+
PipelinedRDD(X, FUN)
559551
})
560552

561553
#' @rdname lapplyPartitionsWithIndex

R/pkg/R/context.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ broadcast <- function(sc, object) {
216216
#' @examples
217217
#'\dontrun{
218218
#' sc <- sparkR.init()
219-
#' setCheckpointDir(sc, "~/checkpoints")
219+
#' setCheckpointDir(sc, "~/checkpoint")
220220
#' rdd <- parallelize(sc, 1:2, 2L)
221221
#' checkpoint(rdd)
222222
#'}

R/pkg/R/pairRDD.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -694,10 +694,6 @@ setMethod("cogroup",
694694
for (i in 1:rddsLen) {
695695
rdds[[i]] <- lapply(rdds[[i]],
696696
function(x) { list(x[[1]], list(i, x[[2]])) })
697-
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
698-
# will not be captured into UDF if getJRDD is not invoked.
699-
# It should be resolved together with that issue.
700-
getJRDD(rdds[[i]]) # Capture the closure.
701697
}
702698
union.rdd <- Reduce(unionRDD, rdds)
703699
group.func <- function(vlist) {

R/pkg/inst/tests/test_binaryFile.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
2727
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
2828
writeLines(mockFile, fileName1)
2929

30-
rdd <- textFile(sc, fileName1)
30+
rdd <- textFile(sc, fileName1, 1)
3131
saveAsObjectFile(rdd, fileName2)
3232
rdd <- objectFile(sc, fileName2)
3333
expect_equal(collect(rdd), as.list(mockFile))
@@ -40,7 +40,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
4040
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
4141

4242
l <- list(1, 2, 3)
43-
rdd <- parallelize(sc, l)
43+
rdd <- parallelize(sc, l, 1)
4444
saveAsObjectFile(rdd, fileName)
4545
rdd <- objectFile(sc, fileName)
4646
expect_equal(collect(rdd), l)

R/pkg/inst/tests/test_rdd.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
141141
unpersist(rdd2)
142142
expect_false(rdd2@env$isCached)
143143

144-
setCheckpointDir(sc, "checkpoints")
144+
tempDir <- tempfile(pattern = "checkpoint")
145+
setCheckpointDir(sc, tempDir)
145146
checkpoint(rdd2)
146147
expect_true(rdd2@env$isCheckpointed)
147148

@@ -152,7 +153,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
152153
# make sure the data is collectable
153154
collect(rdd2)
154155

155-
unlink("checkpoints")
156+
unlink(tempDir)
156157
})
157158

158159
test_that("reduce on RDD", {

R/pkg/inst/tests/test_textFile.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
8181
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
8282
writeLines(mockFile, fileName1)
8383

84-
rdd <- textFile(sc, fileName1)
84+
rdd <- textFile(sc, fileName1, 1L)
8585
saveAsTextFile(rdd, fileName2)
8686
rdd <- textFile(sc, fileName2)
8787
expect_equal(collect(rdd), as.list(mockFile))
@@ -93,7 +93,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
9393
test_that("saveAsTextFile() on a parallelized list works as expected", {
9494
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
9595
l <- list(1, 2, 3)
96-
rdd <- parallelize(sc, l)
96+
rdd <- parallelize(sc, l, 1L)
9797
saveAsTextFile(rdd, fileName)
9898
rdd <- textFile(sc, fileName)
9999
expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))

bagel/src/test/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
2424
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
2525

2626
# Ignore messages below warning level from Jetty, because it's a bit verbose
27-
log4j.logger.org.eclipse.jetty=WARN
27+
log4j.logger.org.spark-project.jetty=WARN

bin/spark-class

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
8282
fi
8383
fi
8484

85+
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
86+
87+
# Add the launcher build dir to the classpath if requested.
88+
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
89+
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
90+
fi
91+
92+
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
93+
8594
# The launcher library will print arguments separated by a NULL character, to allow arguments with
8695
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
8796
# an array that will be used to exec the final command.
8897
CMD=()
8998
while IFS= read -d '' -r ARG; do
9099
CMD+=("$ARG")
91-
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
100+
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
92101

93102
if [ "${CMD[0]}" = "usage" ]; then
94103
"${CMD[@]}"

bin/spark-class2.cmd

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
4646
exit /b 1
4747
)
4848

49+
set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%
50+
51+
rem Add the launcher build dir to the classpath if requested.
52+
if not "x%SPARK_PREPEND_CLASSES%"=="x" (
53+
set LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
54+
)
55+
56+
set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%
57+
4958
rem Figure out where java is.
5059
set RUNNER=java
5160
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
5261

5362
rem The launcher library prints the command to be executed in a single line suitable for being
5463
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
55-
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
64+
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
5665
set SPARK_CMD=%%i
5766
)
5867
%SPARK_CMD%

conf/log4j.properties.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
66
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
77

88
# Settings to quiet third party logs that are too verbose
9-
log4j.logger.org.eclipse.jetty=WARN
10-
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
9+
log4j.logger.org.spark-project.jetty=WARN
10+
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
1111
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
1212
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

0 commit comments

Comments
 (0)