Skip to content

Commit 2a4225d

Browse files
sryzatgravescs
authored andcommitted
SPARK-1639. Tidy up some Spark on YARN code
This contains a bunch of small tidyings of the Spark on YARN code. I focused on the yarn stable code. @tgravescs, let me know if you'd like me to make these for the alpha code as well. Author: Sandy Ryza <[email protected]> Closes apache#561 from sryza/sandy-spark-1639 and squashes the following commits: 72b6a02 [Sandy Ryza] Fix comment and set name on driver thread c2190b2 [Sandy Ryza] SPARK-1639. Tidy up some Spark on YARN code
1 parent 6e11930 commit 2a4225d

File tree

7 files changed

+161
-178
lines changed

7 files changed

+161
-178
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
277277
yarnAllocator.allocateContainers(
278278
math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
279279
ApplicationMaster.incrementAllocatorLoop(1)
280-
Thread.sleep(100)
280+
Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
281281
}
282282
} finally {
283283
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
@@ -416,6 +416,7 @@ object ApplicationMaster {
416416
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
417417
// optimal as more containers are available. Might need to handle this better.
418418
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
419+
private val ALLOCATE_HEARTBEAT_INTERVAL = 100
419420

420421
def incrementAllocatorLoop(by: Int) {
421422
val count = yarnAllocatorLoop.getAndAdd(by)
@@ -467,13 +468,22 @@ object ApplicationMaster {
467468
})
468469
}
469470

470-
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
471+
modified
472+
}
473+
474+
475+
/**
476+
* Returns when we've either
477+
* 1) received all the requested executors,
478+
* 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
479+
* 3) hit an error that causes us to terminate trying to get containers.
480+
*/
481+
def waitForInitialAllocations() {
471482
yarnAllocatorLoop.synchronized {
472483
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
473484
yarnAllocatorLoop.wait(1000L)
474485
}
475486
}
476-
modified
477487
}
478488

479489
def main(argStrings: Array[String]) {

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
1919

2020
import java.io.File
2121
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
22-
import java.nio.ByteBuffer
2322

2423
import scala.collection.JavaConversions._
2524
import scala.collection.mutable.{HashMap, ListBuffer, Map}
@@ -37,7 +36,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
3736
import org.apache.hadoop.yarn.api.protocolrecords._
3837
import org.apache.hadoop.yarn.api.records._
3938
import org.apache.hadoop.yarn.conf.YarnConfiguration
40-
import org.apache.hadoop.yarn.util.{Apps, Records}
39+
import org.apache.hadoop.yarn.util.Records
4140
import org.apache.spark.{Logging, SparkConf, SparkContext}
4241

4342
/**
@@ -169,14 +168,13 @@ trait ClientBase extends Logging {
169168
destPath
170169
}
171170

172-
def qualifyForLocal(localURI: URI): Path = {
171+
private def qualifyForLocal(localURI: URI): Path = {
173172
var qualifiedURI = localURI
174-
// If not specified assume these are in the local filesystem to keep behavior like Hadoop
173+
// If not specified, assume these are in the local filesystem to keep behavior like Hadoop
175174
if (qualifiedURI.getScheme() == null) {
176175
qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
177176
}
178-
val qualPath = new Path(qualifiedURI)
179-
qualPath
177+
new Path(qualifiedURI)
180178
}
181179

182180
def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
@@ -305,13 +303,13 @@ trait ClientBase extends Logging {
305303

306304
val amMemory = calculateAMMemory(newApp)
307305

308-
val JAVA_OPTS = ListBuffer[String]()
306+
val javaOpts = ListBuffer[String]()
309307

310308
// Add Xmx for AM memory
311-
JAVA_OPTS += "-Xmx" + amMemory + "m"
309+
javaOpts += "-Xmx" + amMemory + "m"
312310

313311
val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
314-
JAVA_OPTS += "-Djava.io.tmpdir=" + tmpDir
312+
javaOpts += "-Djava.io.tmpdir=" + tmpDir
315313

316314
// TODO: Remove once cpuset version is pushed out.
317315
// The context is, default gc for server class machines ends up using all cores to do gc -
@@ -325,11 +323,11 @@ trait ClientBase extends Logging {
325323
if (useConcurrentAndIncrementalGC) {
326324
// In our expts, using (default) throughput collector has severe perf ramifications in
327325
// multi-tenant machines
328-
JAVA_OPTS += "-XX:+UseConcMarkSweepGC"
329-
JAVA_OPTS += "-XX:+CMSIncrementalMode"
330-
JAVA_OPTS += "-XX:+CMSIncrementalPacing"
331-
JAVA_OPTS += "-XX:CMSIncrementalDutyCycleMin=0"
332-
JAVA_OPTS += "-XX:CMSIncrementalDutyCycle=10"
326+
javaOpts += "-XX:+UseConcMarkSweepGC"
327+
javaOpts += "-XX:+CMSIncrementalMode"
328+
javaOpts += "-XX:+CMSIncrementalPacing"
329+
javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
330+
javaOpts += "-XX:CMSIncrementalDutyCycle=10"
333331
}
334332

335333
// SPARK_JAVA_OPTS is deprecated, but for backwards compatibility:
@@ -344,22 +342,22 @@ trait ClientBase extends Logging {
344342
// If we are being launched in client mode, forward the spark-conf options
345343
// onto the executor launcher
346344
for ((k, v) <- sparkConf.getAll) {
347-
JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
345+
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
348346
}
349347
} else {
350348
// If we are being launched in standalone mode, capture and forward any spark
351349
// system properties (e.g. set by spark-class).
352350
for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) {
353-
JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\""
351+
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
354352
}
355-
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => JAVA_OPTS += opts)
356-
sys.props.get("spark.driver.libraryPath").foreach(p => JAVA_OPTS += s"-Djava.library.path=$p")
353+
sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts)
354+
sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p")
357355
}
358-
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
356+
javaOpts += ClientBase.getLog4jConfiguration(localResources)
359357

360358
// Command for the ApplicationMaster
361359
val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
362-
JAVA_OPTS ++
360+
javaOpts ++
363361
Seq(args.amClass, "--class", args.userClass, "--jar ", args.userJar,
364362
userArgsToString(args),
365363
"--executor-memory", args.executorMemory.toString,

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.api._
2828
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
2929
import org.apache.hadoop.yarn.api.records._
3030
import org.apache.hadoop.yarn.conf.YarnConfiguration
31-
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
31+
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
3232

3333
import org.apache.spark.{Logging, SparkConf}
3434

@@ -46,30 +46,30 @@ trait ExecutorRunnableUtil extends Logging {
4646
executorCores: Int,
4747
localResources: HashMap[String, LocalResource]): List[String] = {
4848
// Extra options for the JVM
49-
val JAVA_OPTS = ListBuffer[String]()
49+
val javaOpts = ListBuffer[String]()
5050
// Set the JVM memory
5151
val executorMemoryString = executorMemory + "m"
52-
JAVA_OPTS += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
52+
javaOpts += "-Xms" + executorMemoryString + " -Xmx" + executorMemoryString + " "
5353

5454
// Set extra Java options for the executor, if defined
5555
sys.props.get("spark.executor.extraJavaOptions").foreach { opts =>
56-
JAVA_OPTS += opts
56+
javaOpts += opts
5757
}
5858

59-
JAVA_OPTS += "-Djava.io.tmpdir=" +
59+
javaOpts += "-Djava.io.tmpdir=" +
6060
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
61-
JAVA_OPTS += ClientBase.getLog4jConfiguration(localResources)
61+
javaOpts += ClientBase.getLog4jConfiguration(localResources)
6262

6363
// Certain configs need to be passed here because they are needed before the Executor
6464
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
6565
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
6666
// authentication settings.
6767
sparkConf.getAll.
6868
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
69-
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
69+
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
7070

7171
sparkConf.getAkkaConf.
72-
foreach { case (k, v) => JAVA_OPTS += "-D" + k + "=" + "\\\"" + v + "\\\"" }
72+
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
7373

7474
// Commenting it out for now - so that people can refer to the properties if required. Remove
7575
// it once cpuset version is pushed out.
@@ -88,11 +88,11 @@ trait ExecutorRunnableUtil extends Logging {
8888
// multi-tennent machines
8989
// The options are based on
9090
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
91-
JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
92-
JAVA_OPTS += " -XX:+CMSIncrementalMode "
93-
JAVA_OPTS += " -XX:+CMSIncrementalPacing "
94-
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
95-
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
91+
javaOpts += " -XX:+UseConcMarkSweepGC "
92+
javaOpts += " -XX:+CMSIncrementalMode "
93+
javaOpts += " -XX:+CMSIncrementalPacing "
94+
javaOpts += " -XX:CMSIncrementalDutyCycleMin=0 "
95+
javaOpts += " -XX:CMSIncrementalDutyCycle=10 "
9696
}
9797
*/
9898

@@ -104,7 +104,7 @@ trait ExecutorRunnableUtil extends Logging {
104104
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
105105
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
106106
"-XX:OnOutOfMemoryError='kill %p'") ++
107-
JAVA_OPTS ++
107+
javaOpts ++
108108
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
109109
masterAddress.toString,
110110
slaveId.toString,

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
3333

3434
def this(sc: SparkContext) = this(sc, new Configuration())
3535

36-
// Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
37-
// Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
38-
// Subsequent creations are ignored - since nodes are already allocated by then.
39-
36+
// Nothing else for now ... initialize application master : which needs a SparkContext to
37+
// determine how to allocate.
38+
// Note that only the first creation of a SparkContext influences (and ideally, there must be
39+
// only one SparkContext, right ?). Subsequent creations are ignored since executors are already
40+
// allocated by then.
4041

4142
// By default, rack is unknown
4243
override def getRackForHost(hostPort: String): Option[String] = {
@@ -48,6 +49,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
4849
override def postStartHook() {
4950
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
5051
if (sparkContextInitialized){
52+
ApplicationMaster.waitForInitialAllocations()
5153
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
5254
Thread.sleep(3000L)
5355
}

0 commit comments

Comments
 (0)