Skip to content

Commit 3dd8af7

Browse files
li-zhihuitgravescs
authored andcommitted
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered
Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <[email protected]> Closes apache#900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
1 parent d60b09b commit 3dd8af7

File tree

13 files changed

+127
-2
lines changed

13 files changed

+127
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,16 @@ object SparkContext extends Logging {
15311531
throw new SparkException("YARN mode not available ?", e)
15321532
}
15331533
}
1534-
val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
1534+
val backend = try {
1535+
val clazz =
1536+
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
1537+
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
1538+
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
1539+
} catch {
1540+
case e: Exception => {
1541+
throw new SparkException("YARN mode not available ?", e)
1542+
}
1543+
}
15351544
scheduler.initialize(backend)
15361545
scheduler
15371546

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
3030

3131
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
3232
throw new UnsupportedOperationException
33+
def isReady(): Boolean = true
3334
}

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
145145
}
146146
}
147147

148+
override def postStartHook() {
149+
waitBackendReady()
150+
}
151+
148152
override def submitTasks(taskSet: TaskSet) {
149153
val tasks = taskSet.tasks
150154
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl(
437441

438442
// By default, rack is unknown
439443
def getRackForHost(value: String): Option[String] = None
444+
445+
private def waitBackendReady(): Unit = {
446+
if (backend.isReady) {
447+
return
448+
}
449+
while (!backend.isReady) {
450+
synchronized {
451+
this.wait(100)
452+
}
453+
}
454+
}
440455
}
441456

442457

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
4646
{
4747
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
4848
var totalCoreCount = new AtomicInteger(0)
49+
var totalExpectedExecutors = new AtomicInteger(0)
4950
val conf = scheduler.sc.conf
5051
private val timeout = AkkaUtils.askTimeout(conf)
5152
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
53+
// Submit tasks only after (registered executors / total expected executors)
54+
// is equal to at least this value, that is double between 0 and 1.
55+
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
56+
if (minRegisteredRatio > 1) minRegisteredRatio = 1
57+
// Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
58+
val maxRegisteredWaitingTime =
59+
conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
60+
val createTime = System.currentTimeMillis()
61+
var ready = if (minRegisteredRatio <= 0) true else false
5262

5363
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5464
private val executorActor = new HashMap[String, ActorRef]
@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
8393
executorAddress(executorId) = sender.path.address
8494
addressToExecutorId(sender.path.address) = executorId
8595
totalCoreCount.addAndGet(cores)
96+
if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
97+
ready = true
98+
logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
99+
executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
100+
", minRegisteredExecutorsRatio: " + minRegisteredRatio)
101+
}
86102
makeOffers()
87103
}
88104

@@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
247263
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
248264
}
249265
}
266+
267+
override def isReady(): Boolean = {
268+
if (ready) {
269+
return true
270+
}
271+
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
272+
ready = true
273+
logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
274+
"maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
275+
return true
276+
}
277+
false
278+
}
250279
}
251280

252281
private[spark] object CoarseGrainedSchedulerBackend {

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
9595

9696
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
9797
memory: Int) {
98+
totalExpectedExecutors.addAndGet(1)
9899
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
99100
fullId, hostPort, cores, Utils.megabytesToString(memory)))
100101
}

docs/configuration.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful
699699
(in milliseconds)
700700
</td>
701701
</tr>
702+
</tr>
703+
<td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td>
704+
<td>0</td>
705+
<td>
706+
The minimum ratio of registered executors (registered executors / total expected executors)
707+
to wait for before scheduling begins. Specified as a double between 0 and 1.
708+
Regardless of whether the minimum ratio of executors has been reached,
709+
the maximum amount of time it will wait before scheduling begins is controlled by config
710+
<code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code>
711+
</td>
712+
</tr>
713+
<tr>
714+
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td>
715+
<td>30000</td>
716+
<td>
717+
Maximum amount of time to wait for executors to register before scheduling begins
718+
(in milliseconds).
719+
</td>
720+
</tr>
702721
</table>
703722

704723
#### Security

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
184184

185185
private def startUserClass(): Thread = {
186186
logInfo("Starting the user JAR in a separate Thread")
187+
System.setProperty("spark.executor.instances", args.numExecutors.toString)
187188
val mainMethod = Class.forName(
188189
args.userClass,
189190
false /* initialize */ ,

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
2626
var userArgs: Seq[String] = Seq[String]()
2727
var executorMemory = 1024
2828
var executorCores = 1
29-
var numExecutors = 2
29+
var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
3030

3131
parseArgs(args.toList)
3232

@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
9393
System.exit(exitCode)
9494
}
9595
}
96+
97+
object ApplicationMasterArguments {
98+
val DEFAULT_NUMBER_EXECUTORS = 2
99+
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
4040

4141
override def postStartHook() {
4242

43+
super.postStartHook()
4344
// The yarn application is running, but the executor might not yet ready
4445
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
46+
// TODO It needn't after waitBackendReady
4547
Thread.sleep(2000L)
4648
logInfo("YarnClientClusterScheduler.postStartHook done")
4749
}

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend(
7575

7676
logDebug("ClientArguments called with: " + argsArrayBuf)
7777
val args = new ClientArguments(argsArrayBuf.toArray, conf)
78+
totalExpectedExecutors.set(args.numExecutors)
7879
client = new Client(args, conf)
7980
appId = client.runApp()
8081
waitForApp()

0 commit comments

Comments
 (0)