Skip to content

Commit 8d7ce07

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into rest
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
2 parents 6f0c597 + ca66159 commit 8d7ce07

File tree

117 files changed

+4919
-467
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

117 files changed

+4919
-467
lines changed

assembly/pom.xml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<deb.pkg.name>spark</deb.pkg.name>
4040
<deb.install.path>/usr/share/spark</deb.install.path>
4141
<deb.user>root</deb.user>
42-
<deb.bin.filemode>744</deb.bin.filemode>
42+
<deb.bin.filemode>755</deb.bin.filemode>
4343
</properties>
4444

4545
<dependencies>
@@ -280,7 +280,7 @@
280280
<user>${deb.user}</user>
281281
<group>${deb.user}</group>
282282
<prefix>${deb.install.path}/conf</prefix>
283-
<filemode>744</filemode>
283+
<filemode>${deb.bin.filemode}</filemode>
284284
</mapper>
285285
</data>
286286
<data>
@@ -302,7 +302,7 @@
302302
<user>${deb.user}</user>
303303
<group>${deb.user}</group>
304304
<prefix>${deb.install.path}/sbin</prefix>
305-
<filemode>744</filemode>
305+
<filemode>${deb.bin.filemode}</filemode>
306306
</mapper>
307307
</data>
308308
<data>
@@ -313,7 +313,7 @@
313313
<user>${deb.user}</user>
314314
<group>${deb.user}</group>
315315
<prefix>${deb.install.path}/python</prefix>
316-
<filemode>744</filemode>
316+
<filemode>${deb.bin.filemode}</filemode>
317317
</mapper>
318318
</data>
319319
</dataSet>

bin/spark-shell.cmd

100755100644
File mode changed.

bin/spark-submit2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ set ORIG_ARGS=%*
2525
rem Reset the values of all variables used
2626
set SPARK_SUBMIT_DEPLOY_MODE=client
2727

28-
if not defined %SPARK_CONF_DIR% (
28+
if [%SPARK_CONF_DIR%] == [] (
2929
set SPARK_CONF_DIR=%SPARK_HOME%\conf
3030
)
3131
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,15 @@ private[spark] class ExecutorAllocationManager(
7676
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
7777
Integer.MAX_VALUE)
7878

79-
// How long there must be backlogged tasks for before an addition is triggered
79+
// How long there must be backlogged tasks for before an addition is triggered (seconds)
8080
private val schedulerBacklogTimeout = conf.getLong(
81-
"spark.dynamicAllocation.schedulerBacklogTimeout", 60)
81+
"spark.dynamicAllocation.schedulerBacklogTimeout", 5)
8282

8383
// Same as above, but used only after `schedulerBacklogTimeout` is exceeded
8484
private val sustainedSchedulerBacklogTimeout = conf.getLong(
8585
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
8686

87-
// How long an executor must be idle for before it is removed
87+
// How long an executor must be idle for before it is removed (seconds)
8888
private val executorIdleTimeout = conf.getLong(
8989
"spark.dynamicAllocation.executorIdleTimeout", 600)
9090

@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
486486
}
487487
}
488488

489-
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
490-
val executorId = blockManagerAdded.blockManagerId.executorId
489+
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
490+
val executorId = executorAdded.executorId
491491
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
492492
// This guards against the race condition in which the `SparkListenerTaskStart`
493493
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
498498
}
499499
}
500500

501-
override def onBlockManagerRemoved(
502-
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
503-
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
501+
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
502+
allocationManager.onExecutorRemoved(executorRemoved.executorId)
504503
}
505504

506505
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
288288
// the bound port to the cluster manager properly
289289
ui.foreach(_.bind())
290290

291-
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
291+
/**
292+
* A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
293+
*
294+
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
295+
* plan to set some global configurations for all Hadoop RDDs.
296+
*/
292297
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
293298

294299
// Add each JAR given through the constructor
@@ -694,7 +699,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
694699
* necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
695700
* using the older MapReduce API (`org.apache.hadoop.mapred`).
696701
*
697-
* @param conf JobConf for setting up the dataset
702+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
703+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
704+
* sure you won't modify the conf. A safe approach is always creating a new conf for
705+
* a new RDD.
698706
* @param inputFormatClass Class of the InputFormat
699707
* @param keyClass Class of the keys
700708
* @param valueClass Class of the values
@@ -830,6 +838,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
830838
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
831839
* and extra configuration options to pass to the input format.
832840
*
841+
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
842+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
843+
* sure you won't modify the conf. A safe approach is always creating a new conf for
844+
* a new RDD.
845+
* @param fClass Class of the InputFormat
846+
* @param kClass Class of the keys
847+
* @param vClass Class of the values
848+
*
833849
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
834850
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
835851
* operation will create many references to the same object.

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,20 @@ private[spark] object TestUtils {
4343
* Note: if this is used during class loader tests, class names should be unique
4444
* in order to avoid interference between tests.
4545
*/
46-
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
46+
def createJarWithClasses(
47+
classNames: Seq[String],
48+
toStringValue: String = "",
49+
classNamesWithBase: Seq[(String, String)] = Seq(),
50+
classpathUrls: Seq[URL] = Seq()): URL = {
4751
val tempDir = Utils.createTempDir()
48-
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
52+
val files1 = for (name <- classNames) yield {
53+
createCompiledClass(name, tempDir, toStringValue, classpathUrls = classpathUrls)
54+
}
55+
val files2 = for ((childName, baseName) <- classNamesWithBase) yield {
56+
createCompiledClass(childName, tempDir, toStringValue, baseName, classpathUrls)
57+
}
4958
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
50-
createJar(files, jarFile)
59+
createJar(files1 ++ files2, jarFile)
5160
}
5261

5362

@@ -85,15 +94,26 @@ private[spark] object TestUtils {
8594
}
8695

8796
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
88-
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
97+
def createCompiledClass(
98+
className: String,
99+
destDir: File,
100+
toStringValue: String = "",
101+
baseClass: String = null,
102+
classpathUrls: Seq[URL] = Seq()): File = {
89103
val compiler = ToolProvider.getSystemJavaCompiler
104+
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
90105
val sourceFile = new JavaSourceFromString(className,
91-
"public class " + className + " implements java.io.Serializable {" +
92-
" @Override public String toString() { return \"" + value + "\"; }}")
106+
"public class " + className + extendsText + " implements java.io.Serializable {" +
107+
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
93108

94109
// Calling this outputs a class file in pwd. It's easier to just rename the file than
95110
// build a custom FileManager that controls the output location.
96-
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()
111+
val options = if (classpathUrls.nonEmpty) {
112+
Seq("-classpath", classpathUrls.map { _.getFile }.mkString(File.pathSeparator))
113+
} else {
114+
Seq()
115+
}
116+
compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call()
97117

98118
val fileName = className + ".class"
99119
val result = new File(fileName)

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,15 @@ class JavaSparkContext(val sc: SparkContext)
373373
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
374374
* etc).
375375
*
376+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
377+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
378+
* sure you won't modify the conf. A safe approach is always creating a new conf for
379+
* a new RDD.
380+
* @param inputFormatClass Class of the InputFormat
381+
* @param keyClass Class of the keys
382+
* @param valueClass Class of the values
383+
* @param minPartitions Minimum number of Hadoop Splits to generate.
384+
*
376385
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
377386
* record, directly caching the returned RDD will create many references to the same object.
378387
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -395,6 +404,14 @@ class JavaSparkContext(val sc: SparkContext)
395404
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
396405
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
397406
*
407+
* @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.
408+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
409+
* sure you won't modify the conf. A safe approach is always creating a new conf for
410+
* a new RDD.
411+
* @param inputFormatClass Class of the InputFormat
412+
* @param keyClass Class of the keys
413+
* @param valueClass Class of the values
414+
*
398415
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
399416
* record, directly caching the returned RDD will create many references to the same object.
400417
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -476,6 +493,14 @@ class JavaSparkContext(val sc: SparkContext)
476493
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
477494
* and extra configuration options to pass to the input format.
478495
*
496+
* @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.
497+
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
498+
* sure you won't modify the conf. A safe approach is always creating a new conf for
499+
* a new RDD.
500+
* @param fClass Class of the InputFormat
501+
* @param kClass Class of the keys
502+
* @param vClass Class of the values
503+
*
479504
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
480505
* record, directly caching the returned RDD will create many references to the same object.
481506
* If you plan to directly cache Hadoop writable objects, you should first copy them using
@@ -675,6 +700,9 @@ class JavaSparkContext(val sc: SparkContext)
675700

676701
/**
677702
* Returns the Hadoop configuration used for the Hadoop code (e.g. file systems) we reuse.
703+
*
704+
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
705+
* plan to set some global configurations for all Hadoop RDDs.
678706
*/
679707
def hadoopConfiguration(): Configuration = {
680708
sc.hadoopConfiguration

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@
1818
package org.apache.spark.deploy
1919

2020
import java.io.{File, PrintStream}
21-
import java.lang.reflect.{Modifier, InvocationTargetException}
21+
import java.lang.reflect.{InvocationTargetException, Modifier}
2222
import java.net.URL
2323

2424
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2525

2626
import org.apache.hadoop.fs.Path
27-
2827
import org.apache.ivy.Ivy
2928
import org.apache.ivy.core.LogOptions
30-
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
31-
import org.apache.ivy.core.module.id.{ModuleId, ArtifactId, ModuleRevisionId}
29+
import org.apache.ivy.core.module.descriptor._
30+
import org.apache.ivy.core.module.id.{ArtifactId, ModuleId, ModuleRevisionId}
3231
import org.apache.ivy.core.report.ResolveReport
3332
import org.apache.ivy.core.resolve.ResolveOptions
3433
import org.apache.ivy.core.retrieve.RetrieveOptions
@@ -37,7 +36,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
3736
import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3837

3938
import org.apache.spark.deploy.rest._
40-
import org.apache.spark.executor.ExecutorURLClassLoader
39+
import org.apache.spark.executor._
4140
import org.apache.spark.util.Utils
4241

4342
/**
@@ -467,8 +466,14 @@ object SparkSubmit {
467466
printStream.println("\n")
468467
}
469468

470-
val loader = new ExecutorURLClassLoader(new Array[URL](0),
471-
Thread.currentThread.getContextClassLoader)
469+
val loader =
470+
if (sysProps.getOrElse("spark.files.userClassPathFirst", "false").toBoolean) {
471+
new ChildExecutorURLClassLoader(new Array[URL](0),
472+
Thread.currentThread.getContextClassLoader)
473+
} else {
474+
new ExecutorURLClassLoader(new Array[URL](0),
475+
Thread.currentThread.getContextClassLoader)
476+
}
472477
Thread.currentThread.setContextClassLoader(loader)
473478

474479
for (jar <- childClasspath) {
@@ -512,7 +517,7 @@ object SparkSubmit {
512517
}
513518
}
514519

515-
private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
520+
private def addJarToClasspath(localJar: String, loader: MutableURLClassLoader) {
516521
val uri = Utils.resolveURI(localJar)
517522
uri.getScheme match {
518523
case "file" | "local" =>

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
173173
val logInfos = statusList
174174
.filter { entry =>
175175
try {
176-
val modTime = getModificationTime(entry)
177-
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
178-
modTime >= lastModifiedTime
176+
getModificationTime(entry).map { time =>
177+
newLastModifiedTime = math.max(newLastModifiedTime, time)
178+
time >= lastModifiedTime
179+
}.getOrElse(false)
179180
} catch {
180181
case e: AccessControlException =>
181182
// Do not use "logInfo" since these messages can get pretty noisy if printed on
@@ -251,7 +252,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
251252
appListener.appName.getOrElse(NOT_STARTED),
252253
appListener.startTime.getOrElse(-1L),
253254
appListener.endTime.getOrElse(-1L),
254-
getModificationTime(eventLog),
255+
getModificationTime(eventLog).get,
255256
appListener.sparkUser.getOrElse(NOT_STARTED),
256257
isApplicationCompleted(eventLog))
257258
} finally {
@@ -310,11 +311,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
310311
*/
311312
private def isLegacyLogDirectory(entry: FileStatus): Boolean = entry.isDir()
312313

313-
private def getModificationTime(fsEntry: FileStatus): Long = {
314-
if (fsEntry.isDir) {
315-
fs.listStatus(fsEntry.getPath).map(_.getModificationTime()).max
314+
/**
315+
* Returns the modification time of the given event log. If the status points at an empty
316+
* directory, `None` is returned, indicating that there isn't an event log at that location.
317+
*/
318+
private def getModificationTime(fsEntry: FileStatus): Option[Long] = {
319+
if (isLegacyLogDirectory(fsEntry)) {
320+
val statusList = fs.listStatus(fsEntry.getPath)
321+
if (!statusList.isEmpty) Some(statusList.map(_.getModificationTime()).max) else None
316322
} else {
317-
fsEntry.getModificationTime()
323+
Some(fsEntry.getModificationTime())
318324
}
319325
}
320326

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
4343
val worker: ActorRef,
4444
val workerId: String,
4545
val host: String,
46+
val webUiPort: Int,
4647
val sparkHome: File,
4748
val executorDir: File,
4849
val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
134135
// In case we are running this from within the Spark Shell, avoid creating a "scala"
135136
// parent process for the executor command
136137
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
138+
139+
// Add webUI log urls
140+
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
141+
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
142+
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
143+
137144
process = builder.start()
138145
val header = "Spark Executor Command: %s\n%s\n\n".format(
139146
command.mkString("\"", "\" \"", "\""), "=" * 40)

0 commit comments

Comments
 (0)