Skip to content

Commit 2cea5f4

Browse files
committed
Make the block manager decommissioning test be less flaky
An interesting failure happens when migrateDuring = true (and persist or shuffle is true): - We schedule the job with tasks on executors 0, 1, 2. - We wait 300 ms and decommission executor 0. - If the task is not yet done on executor 0, it will now fail because the block manager won't be able to save the block. This condition is easy to trigger on a loaded machine where the github checks run. - The task with retry on a different executor (1 or 2) and its shuffle blocks will land there. - No actual block migration happens here because the decommissioned executor technically failed before it could even produce a block. So this change makes two fixes to remove the above race condition. - When migrateDuring = true, wait for a task to complete and write the block, and then decommission that executor. - When migrateDuring = false, it is still possible (because of delay scheduling) for two tasks to be run on the same executor serially and one executor to go idle. In which case, we must make sure to decommission an executor that actually had a task run on it.
1 parent d1301af commit 2cea5f4

File tree

1 file changed

+43
-13
lines changed

1 file changed

+43
-13
lines changed

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.storage
1919

2020
import java.util.concurrent.Semaphore
21+
import java.util.concurrent.atomic.AtomicReference
2122

2223
import scala.collection.mutable.ArrayBuffer
2324
import scala.concurrent.duration._
@@ -92,8 +93,22 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
9293
val executorRemovedSem = new Semaphore(0)
9394
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
9495
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
95-
sc.addSparkListener(new SparkListener {
96+
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
97+
val execToDecommission = new AtomicReference[String](null)
98+
99+
def maybeDoDecommission(candidateExecToDecom: String): Boolean = {
100+
if (execToDecommission.compareAndSet(null, candidateExecToDecom)) {
101+
val decomContext = s"Decommissioning executor ${candidateExecToDecom}"
102+
logInfo(decomContext)
103+
sched.decommissionExecutor(candidateExecToDecom,
104+
ExecutorDecommissionInfo(decomContext, false))
105+
true
106+
} else {
107+
false
108+
}
109+
}
96110

111+
sc.addSparkListener(new SparkListener {
97112
override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
98113
executorRemovedSem.release()
99114
}
@@ -104,6 +119,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
104119

105120
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
106121
taskEndEvents.append(taskEnd)
122+
if (migrateDuring && taskEnd.taskInfo.successful) {
123+
// When migrating during, we decommission an executor _after_ it is done with a task.
124+
// We don't decommission an executor while it is running, because doing so might cause
125+
// a task to fail. The task fails because a decommissioned block manager cannot save
126+
// blocks. When this happens the task will be re-run on a different executor, but by
127+
// which time we have already decommissioned a "wrong" executor. To avoid this condition,
128+
// we wait for a task to finish successfully and go and decommission its executor.
129+
maybeDoDecommission(taskEnd.taskInfo.executorId)
130+
}
107131
}
108132

109133
override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
@@ -139,22 +163,28 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
139163
ThreadUtils.awaitResult(asyncCount, 15.seconds)
140164
}
141165

142-
// Decommission one of the executors.
143-
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
144-
val execs = sched.getExecutorIds()
145-
assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")
166+
sc.listenerBus.waitUntilEmpty()
146167

147-
val execToDecommission = execs.head
148-
logDebug(s"Decommissioning executor ${execToDecommission}")
149-
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))
168+
val asyncCountResult = if (migrateDuring) {
169+
// Wait for the job to finish
170+
ThreadUtils.awaitResult(asyncCount, 15.seconds)
171+
} else {
172+
// (See comment in listener's onTaskEnd method.)
173+
// But when migrating after a job is done, we can decommission any executor that had a task
174+
val executorWithSuccessfulTask = taskEndEvents
175+
.filter(_.taskInfo.successful)
176+
.map(_.taskInfo.executorId)
177+
.head
178+
val decommissioned = maybeDoDecommission(executorWithSuccessfulTask)
179+
assert(decommissioned, s"No prior decommissioning should have happened")
180+
// We had already waited for the job to finish above, so the timeout is zero
181+
ThreadUtils.awaitResult(asyncCount, 0.seconds)
182+
}
150183

151-
// Wait for job to finish.
152-
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
153184
assert(asyncCountResult === numParts)
154185
// All tasks finished, so accum should have been increased numParts times.
155186
assert(accum.value === numParts)
156187

157-
sc.listenerBus.waitUntilEmpty()
158188
if (shuffle) {
159189
// mappers & reducers which succeeded
160190
assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
@@ -206,15 +236,15 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
206236
val execIdToBlocksMapping = storageStatus.map(
207237
status => (status.blockManagerId.executorId, status.blocks)).toMap
208238
// No cached blocks should be present on executor which was decommissioned
209-
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(),
239+
assert(execIdToBlocksMapping(execToDecommission.get()).keys.filter(_.isRDD).toSeq === Seq(),
210240
"Cache blocks should be migrated")
211241
if (persist) {
212242
// There should still be all the RDD blocks cached
213243
assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts)
214244
}
215245

216246
// Make the executor we decommissioned exit
217-
sched.client.killExecutors(List(execToDecommission))
247+
sched.client.killExecutors(List(execToDecommission.get()))
218248

219249
// Wait for the executor to be removed
220250
executorRemovedSem.acquire(1)

0 commit comments

Comments
 (0)