Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkProps extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]

// The num of current max ExecutorId used to re-register appMaster
protected var currentExecutorIdCounter = 0

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -155,6 +158,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of awkward. You don't need to keep track of another variable; just compute the max executor ID when the AM asks for it. You already have all the information you need in executorDataMap.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 Thank for review it. For my understanding, I don't think we can get the max executor ID in executorDataMap. Because, when AM is failure, all the executor are disconnect and be removed, by this time, as the code in method CoarseGrainedSchedulerBackend.removeExecutor show, the executor information in executorDataMap also be removed.
So, I think the executor information in executorDataMap is not complete. What do you think ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, with dynamic allocation, for example, the executor with the max known id may be gone already.

minor: executorId.toInt instead of Integer.parseInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Thanks for your comments. I will optimize it.

if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ThreadUtils

/**
Expand Down Expand Up @@ -81,8 +82,23 @@ private[yarn] class YarnAllocator(
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

@volatile private var numExecutorsRunning = 0
// Used to generate a unique ID per executor
private var executorIdCounter = 0

/**
* Used to generate a unique ID per executor
*
* Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
* the id of new executor will start from 1, this will conflict with the executor has
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to clarify this to say this is required for client mode when driver isn't running on yarn. this isn't an issue in cluster mode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs I think we can clarify this in SPARK-12864 issue. @andrewor14 What's your opinion ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to do it here in this comment to better describe the situation this is needed. It should be a line or two and I personally much prefer that then pointing at jiras unless its a big discussion/background required, then the jira makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs Ok, I will do it soon. Thanks a lot.

* already created before. So, we should initialize the `executorIdCounter` by getting
* the max executorId from driver.
*
* And this situation of executorId conflict is just in yarn client mode, so this is an issue
* in yarn client mode. For more details, can check in jira.
*
* @see SPARK-12864
*/
private var executorIdCounter: Int =
driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)

@volatile private var numExecutorsFailed = 0

@volatile private var targetNumExecutors =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning("Attempted to kill executors before the AM has registered!")
context.reply(false)
}

case RetrieveLastAllocatedExecutorId =>
context.reply(currentExecutorIdCounter)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down