Skip to content

Commit 37f7dc2

Browse files
committed
support yarn mode(percentage style)
1 parent 3f8c941 commit 37f7dc2

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
5050
val conf = scheduler.sc.conf
5151
private val timeout = AkkaUtils.askTimeout(conf)
5252
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
53-
val registeredRatio = conf.getDouble("spark.executor.registeredRatio", 0)
54-
val maxRegisteredWaitingTime = conf.getInt("spark.executor.maxRegisteredWaitingTime", 10000)
53+
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredRatio", 0)
54+
val maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredWaitingTime", 10000)
5555
val createTime = System.currentTimeMillis()
56-
var ready = if(registeredRatio==0)true else false
56+
var ready = if (minRegisteredRatio <= 0) true else false
5757

5858
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
5959
private val executorActor = new HashMap[String, ActorRef]
@@ -88,7 +88,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
8888
executorAddress(executorId) = sender.path.address
8989
addressToExecutorId(sender.path.address) = executorId
9090
totalCoreCount.addAndGet(cores)
91-
if (executorActor.size >= totalExecutors.get() * registeredRatio) {
91+
if (executorActor.size >= totalExecutors.get() * minRegisteredRatio) {
9292
ready = true
9393
}
9494
makeOffers()

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend(
7777

7878
logDebug("ClientArguments called with: " + argsArrayBuf)
7979
val args = new ClientArguments(argsArrayBuf.toArray, conf)
80+
totalExecutors.set(args.numExecutors)
81+
// reset default minRegisteredRatio for yarn mode
82+
if (minRegisteredRatio == 0) {
83+
minRegisteredRatio = 0.9
84+
ready = false
85+
}
8086
client = new Client(args, conf)
8187
appId = client.runApp()
8288
waitForApp()

0 commit comments

Comments
 (0)