Skip to content

Commit 45aac34

Browse files
committed
Refactoring RetrieveLastAllocatedExecutorId
1 parent 0c88e8d commit 45aac34

File tree

8 files changed

+50
-37
lines changed

8 files changed

+50
-37
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,10 +249,6 @@ private[spark] class ExecutorAllocationManager(
249249
* yarn-client mode when AM re-registers after a failure.
250250
*/
251251
def reset(): Unit = synchronized {
252-
initializing = true
253-
numExecutorsTarget = initialNumExecutors
254-
numExecutorsToAdd = 1
255-
256252
executorsPendingToRemove.clear()
257253
removeTimes.clear()
258254
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages {
3535
ioEncryptionKey: Option[Array[Byte]])
3636
extends CoarseGrainedClusterMessage
3737

38-
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
38+
case object GetAMInitialState extends CoarseGrainedClusterMessage
3939

4040
// Driver to executors
4141
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
9393
@GuardedBy("CoarseGrainedSchedulerBackend.this")
9494
protected var localityAwareTasks = 0
9595

96-
// The num of current max ExecutorId used to re-register appMaster
97-
@volatile protected var currentExecutorIdCounter = 0
98-
9996
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
10097
extends ThreadSafeRpcEndpoint with Logging {
10198

@@ -184,9 +181,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
184181
// in this block are read when requesting executors
185182
CoarseGrainedSchedulerBackend.this.synchronized {
186183
executorDataMap.put(executorId, data)
187-
if (currentExecutorIdCounter < executorId.toInt) {
188-
currentExecutorIdCounter = executorId.toInt
189-
}
184+
setCurrentExecutorIdCounter(executorId.toInt)
190185
if (numPendingExecutors > 0) {
191186
numPendingExecutors -= 1
192187
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
@@ -637,6 +632,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
637632
defaultAskTimeout.awaitResult(response)
638633
}
639634

635+
// Set the num of current max ExecutorId used to re-register appMaster
636+
protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {}
637+
640638
/**
641639
* Kill the given list of executors through the cluster manager.
642640
* @return whether the kill request is acknowledged.

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,14 +358,26 @@ private[spark] class ApplicationMaster(
358358
dummyRunner.launchContextDebugInfo()
359359
}
360360

361+
/**
362+
* (executorIdCounter, requestExecutors) should be the initial state
363+
* or the last state AM restart.
364+
*
365+
* @see SPARK-12864, SPARK-20079
366+
*/
367+
val (executorIdCounter, requestExecutors) =
368+
driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState)
361369
allocator = client.register(driverUrl,
362370
driverRef,
363371
yarnConf,
364372
_sparkConf,
365373
uiAddress,
366374
historyAddress,
367375
securityMgr,
368-
localResources)
376+
localResources,
377+
executorIdCounter)
378+
if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) {
379+
amEndpoint.send(requestExecutors)
380+
}
369381

370382
allocator.allocateResources()
371383
reporterThread = launchReporterThread()

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import org.apache.spark.internal.config._
4040
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
4141
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
4242
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
43-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
4443
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
4544

4645
/**
@@ -65,7 +64,8 @@ private[yarn] class YarnAllocator(
6564
appAttemptId: ApplicationAttemptId,
6665
securityMgr: SecurityManager,
6766
localResources: Map[String, LocalResource],
68-
resolver: SparkRackResolver)
67+
resolver: SparkRackResolver,
68+
private var executorIdCounter: Int)
6969
extends Logging {
7070

7171
import YarnAllocator._
@@ -82,22 +82,6 @@ private[yarn] class YarnAllocator(
8282

8383
@volatile private var numExecutorsRunning = 0
8484

85-
/**
86-
* Used to generate a unique ID per executor
87-
*
88-
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
89-
* the id of new executor will start from 1, this will conflict with the executor has
90-
* already created before. So, we should initialize the `executorIdCounter` by getting
91-
* the max executorId from driver.
92-
*
93-
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
94-
* in yarn client mode. For more details, can check in jira.
95-
*
96-
* @see SPARK-12864
97-
*/
98-
private var executorIdCounter: Int =
99-
driverRef.askSync[Int](RetrieveLastAllocatedExecutorId)
100-
10185
// Queue to store the timestamp of failed executors
10286
private val failedExecutorsTimeStamps = new Queue[Long]()
10387

@@ -163,6 +147,8 @@ private[yarn] class YarnAllocator(
163147
clock = newClock
164148
}
165149

150+
def getTargetNumExecutors: Int = targetNumExecutors
151+
166152
def getNumExecutorsRunning: Int = numExecutorsRunning
167153

168154
def getNumExecutorsFailed: Int = synchronized {

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ private[spark] class YarnRMClient extends Logging {
5858
uiAddress: Option[String],
5959
uiHistoryAddress: String,
6060
securityMgr: SecurityManager,
61-
localResources: Map[String, LocalResource]
61+
localResources: Map[String, LocalResource],
62+
executorIdCounter: Int
6263
): YarnAllocator = {
6364
amClient = AMRMClient.createAMRMClient()
6465
amClient.init(conf)
@@ -75,7 +76,7 @@ private[spark] class YarnRMClient extends Logging {
7576
registered = true
7677
}
7778
new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr,
78-
localResources, new SparkRackResolver())
79+
localResources, new SparkRackResolver(), executorIdCounter)
7980
}
8081

8182
/**

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rpc._
2929
import org.apache.spark.scheduler._
3030
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3131
import org.apache.spark.ui.JettyUtils
32-
import org.apache.spark.util.{RpcUtils, ThreadUtils}
32+
import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
3333

3434
/**
3535
* Abstract Yarn scheduler backend that contains common logic
@@ -68,6 +68,12 @@ private[spark] abstract class YarnSchedulerBackend(
6868
// Flag to specify whether this schedulerBackend should be reset.
6969
private var shouldResetOnAmRegister = false
7070

71+
private val currentState = new CurrentAMState(0,
72+
RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty))
73+
74+
protected class CurrentAMState(
75+
var executorIdCounter: Int,
76+
var requestExecutors: RequestExecutors)
7177
/**
7278
* Bind to YARN. This *must* be done before calling [[start()]].
7379
*
@@ -135,7 +141,20 @@ private[spark] abstract class YarnSchedulerBackend(
135141
* This includes executors already pending or running.
136142
*/
137143
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
138-
yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
144+
val requestExecutors = prepareRequestExecutors(requestedTotal)
145+
val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors)
146+
setCurrentRequestExecutors(requestExecutors)
147+
future
148+
}
149+
150+
override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized {
151+
if (currentState.executorIdCounter < executorId.toInt) {
152+
currentState.executorIdCounter = executorId.toInt
153+
}
154+
}
155+
156+
def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized {
157+
currentState.requestExecutors = requestExecutors
139158
}
140159

141160
/**
@@ -305,8 +324,8 @@ private[spark] abstract class YarnSchedulerBackend(
305324
context.reply(false)
306325
}
307326

308-
case RetrieveLastAllocatedExecutorId =>
309-
context.reply(currentExecutorIdCounter)
327+
case GetAMInitialState =>
328+
context.reply((currentState.executorIdCounter, currentState.requestExecutors))
310329
}
311330

312331
override def onDisconnected(remoteAddress: RpcAddress): Unit = {

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
101101
appAttemptId,
102102
new SecurityManager(sparkConf),
103103
Map(),
104-
new MockResolver())
104+
new MockResolver(),
105+
0)
105106
}
106107

107108
def createContainer(host: String): Container = {

0 commit comments

Comments
 (0)