diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index f3d0d85476772..1d92efab84d61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveSparkProps extends CoarseGrainedClusterMessage + case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage + // Driver to executors case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b808993aa6cd3..6c06e1909b614 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -78,6 +78,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] + // The num of current max ExecutorId used to re-register appMaster + protected var currentExecutorIdCounter = 0 + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -155,6 +158,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.put(executorId, data) + if (currentExecutorIdCounter < executorId.toInt) { + currentExecutorIdCounter = executorId.toInt + } if (numPendingExecutors > 0) { numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 11426eb07c7ed..8a50c1884e711 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -37,6 +37,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.ThreadUtils /** @@ -81,8 +82,23 @@ private[yarn] class YarnAllocator( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) @volatile private var numExecutorsRunning = 0 - // Used to generate a unique ID per executor - private var executorIdCounter = 0 + + /** + * Used to generate a unique ID per executor + * + * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then + * the id of new executor will start from 1, this will conflict with the executor has + * already created before. So, we should initialize the `executorIdCounter` by getting + * the max executorId from driver. + * + * And this situation of executorId conflict is just in yarn client mode, so this is an issue + * in yarn client mode. For more details, can check in jira. + * + * @see SPARK-12864 + */ + private var executorIdCounter: Int = + driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) + @volatile private var numExecutorsFailed = 0 @volatile private var targetNumExecutors = diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1431bceb256a7..77f96501dbb16 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -287,6 +287,9 @@ private[spark] abstract class YarnSchedulerBackend( logWarning("Attempted to kill executors before the AM has registered!") context.reply(false) } + + case RetrieveLastAllocatedExecutorId => + context.reply(currentExecutorIdCounter) } override def onDisconnected(remoteAddress: RpcAddress): Unit = {