From 45aac34d312bd80d4e67ab41939cebca8164868b Mon Sep 17 00:00:00 2001 From: Guoqiang Li Date: Wed, 14 Jun 2017 23:08:06 +0800 Subject: [PATCH] Refactoring RetrieveLastAllocatedExecutorId --- .../spark/ExecutorAllocationManager.scala | 4 --- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 10 +++---- .../spark/deploy/yarn/ApplicationMaster.scala | 14 +++++++++- .../spark/deploy/yarn/YarnAllocator.scala | 22 +++------------ .../spark/deploy/yarn/YarnRMClient.scala | 5 ++-- .../cluster/YarnSchedulerBackend.scala | 27 ++++++++++++++++--- .../deploy/yarn/YarnAllocatorSuite.scala | 3 ++- 8 files changed, 50 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index fcc72ff49276..a04c0cc4872c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -249,10 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { - initializing = true - numExecutorsTarget = initialNumExecutors - numExecutorsToAdd = 1 - executorsPendingToRemove.clear() removeTimes.clear() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6b49bd699a13..aa3466047ab0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -35,7 +35,7 @@ private[spark] object CoarseGrainedClusterMessages { ioEncryptionKey: Option[Array[Byte]]) extends CoarseGrainedClusterMessage - case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage + case object GetAMInitialState extends CoarseGrainedClusterMessage // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dc82bb770472..5a0355bef954 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -93,9 +93,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp @GuardedBy("CoarseGrainedSchedulerBackend.this") protected var localityAwareTasks = 0 - // The num of current max ExecutorId used to re-register appMaster - @volatile protected var currentExecutorIdCounter = 0 - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -184,9 +181,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) - if (currentExecutorIdCounter < executorId.toInt) { - currentExecutorIdCounter = executorId.toInt - } + setCurrentExecutorIdCounter(executorId.toInt) if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") @@ -637,6 +632,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp defaultAskTimeout.awaitResult(response) } + // Set the num of current max ExecutorId used to re-register appMaster + protected def setCurrentExecutorIdCounter(executorId: Int): Unit = {} + /** * Kill the given list of executors through the cluster manager. * @return whether the kill request is acknowledged. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6da2c0b5f330..5da490b7f2a4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -358,6 +358,14 @@ private[spark] class ApplicationMaster( dummyRunner.launchContextDebugInfo() } + /** + * (executorIdCounter, requestExecutors) should be the initial state + * or the last state AM restart. + * + * @see SPARK-12864, SPARK-20079 + */ + val (executorIdCounter, requestExecutors) = + driverRef.askSync[(Int, RequestExecutors)](GetAMInitialState) allocator = client.register(driverUrl, driverRef, yarnConf, @@ -365,7 +373,11 @@ private[spark] class ApplicationMaster( uiAddress, historyAddress, securityMgr, - localResources) + localResources, + executorIdCounter) + if (requestExecutors.requestedTotal != allocator.getTargetNumExecutors) { + amEndpoint.send(requestExecutors) + } allocator.allocateResources() reporterThread = launchReporterThread() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ed77a6e4a1c7..00a75d32bc38 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -40,7 +40,6 @@ import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.{Clock, SystemClock, ThreadUtils} /** @@ -65,7 +64,8 @@ private[yarn] class YarnAllocator( appAttemptId: ApplicationAttemptId, securityMgr: SecurityManager, localResources: Map[String, LocalResource], - resolver: SparkRackResolver) + resolver: SparkRackResolver, + private var executorIdCounter: Int) extends Logging { import YarnAllocator._ @@ -82,22 +82,6 @@ private[yarn] class YarnAllocator( @volatile private var numExecutorsRunning = 0 - /** - * Used to generate a unique ID per executor - * - * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then - * the id of new executor will start from 1, this will conflict with the executor has - * already created before. So, we should initialize the `executorIdCounter` by getting - * the max executorId from driver. - * - * And this situation of executorId conflict is just in yarn client mode, so this is an issue - * in yarn client mode. For more details, can check in jira. - * - * @see SPARK-12864 - */ - private var executorIdCounter: Int = - driverRef.askSync[Int](RetrieveLastAllocatedExecutorId) - // Queue to store the timestamp of failed executors private val failedExecutorsTimeStamps = new Queue[Long]() @@ -163,6 +147,8 @@ private[yarn] class YarnAllocator( clock = newClock } + def getTargetNumExecutors: Int = targetNumExecutors + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = synchronized { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 72f4d273ab53..4af2b163189e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -58,7 +58,8 @@ private[spark] class YarnRMClient extends Logging { uiAddress: Option[String], uiHistoryAddress: String, securityMgr: SecurityManager, - localResources: Map[String, LocalResource] + localResources: Map[String, LocalResource], + executorIdCounter: Int ): YarnAllocator = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) @@ -75,7 +76,7 @@ private[spark] class YarnRMClient extends Logging { registered = true } new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, - localResources, new SparkRackResolver()) + localResources, new SparkRackResolver(), executorIdCounter) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cbc6e60e839c..a6e285b33d20 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -29,7 +29,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, ThreadUtils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} /** * Abstract Yarn scheduler backend that contains common logic @@ -68,6 +68,12 @@ private[spark] abstract class YarnSchedulerBackend( // Flag to specify whether this schedulerBackend should be reset. private var shouldResetOnAmRegister = false + private val currentState = new CurrentAMState(0, + RequestExecutors(Utils.getDynamicAllocationInitialExecutors(conf), 0, Map.empty, Set.empty)) + + protected class CurrentAMState( + var executorIdCounter: Int, + var requestExecutors: RequestExecutors) /** * Bind to YARN. This *must* be done before calling [[start()]]. * @@ -135,7 +141,20 @@ private[spark] abstract class YarnSchedulerBackend( * This includes executors already pending or running. */ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { - yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) + val requestExecutors = prepareRequestExecutors(requestedTotal) + val future = yarnSchedulerEndpointRef.ask[Boolean](requestExecutors) + setCurrentRequestExecutors(requestExecutors) + future + } + + override def setCurrentExecutorIdCounter(executorId: Int): Unit = synchronized { + if (currentState.executorIdCounter < executorId.toInt) { + currentState.executorIdCounter = executorId.toInt + } + } + + def setCurrentRequestExecutors(requestExecutors: RequestExecutors): Unit = synchronized { + currentState.requestExecutors = requestExecutors } /** @@ -305,8 +324,8 @@ private[spark] abstract class YarnSchedulerBackend( context.reply(false) } - case RetrieveLastAllocatedExecutorId => - context.reply(currentExecutorIdCounter) + case GetAMInitialState => + context.reply((currentState.executorIdCounter, currentState.requestExecutors)) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 97b0e8aca333..3b6bdbcb3cfa 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -101,7 +101,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter appAttemptId, new SecurityManager(sparkConf), Map(), - new MockResolver()) + new MockResolver(), + 0) } def createContainer(host: String): Container = {