Skip to content

Commit a292c49

Browse files
GraceHsquito
authored andcommitted
[SPARK-9193] Avoid assigning tasks to "lost" executor(s)
Now, when some executors are killed by dynamic-allocation, it leads to some mis-assignment onto lost executors sometimes. Such kind of mis-assignment causes task failure(s) or even job failure if it repeats that errors for 4 times. The root cause is that ***killExecutors*** doesn't remove those executors under killing ASAP. It depends on the ***OnDisassociated*** event to refresh the active working list later. The delay time really depends on your cluster status (from several milliseconds to sub-minute). When new tasks to be scheduled during that period of time, it will be assigned to those "active" but "under killing" executors. Then the tasks will be failed due to "executor lost". The better way is to exclude those executors under killing in the makeOffers(). Then all those tasks won't be allocated onto those executors "to be lost" any more. Author: Grace <[email protected]> Closes #7528 from GraceH/AssignToLostExecutor and squashes the following commits: ecc1da6 [Grace] scala style fix 6e2ed96 [Grace] Re-word makeOffers by more readable lines b5546ce [Grace] Add comments about the fix 30a9ad0 [Grace] Avoid assigning tasks to lost executors (cherry picked from commit 6592a60) Signed-off-by: Imran Rashid <[email protected]> Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
1 parent 1782c0e commit a292c49

File tree

1 file changed

+14
-7
lines changed

1 file changed

+14
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
163163
}
164164

165165
// Make fake resource offers on all executors
166-
def makeOffers() {
167-
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
166+
private def makeOffers() {
167+
// Filter out executors under killing
168+
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
169+
val workOffers = activeExecutors.map { case (id, executorData) =>
168170
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
169-
}.toSeq))
171+
}.toSeq
172+
launchTasks(scheduler.resourceOffers(workOffers))
170173
}
171174

172175
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -175,10 +178,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
175178
}
176179

177180
// Make fake resource offers on just one executor
178-
def makeOffers(executorId: String) {
179-
val executorData = executorDataMap(executorId)
180-
launchTasks(scheduler.resourceOffers(
181-
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
181+
private def makeOffers(executorId: String) {
182+
// Filter out executors under killing
183+
if (!executorsPendingToRemove.contains(executorId)) {
184+
val executorData = executorDataMap(executorId)
185+
val workOffers = Seq(
186+
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
187+
launchTasks(scheduler.resourceOffers(workOffers))
188+
}
182189
}
183190

184191
// Launch tasks returned by a set of resource offers

0 commit comments

Comments
 (0)