Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.storage

import java.util.concurrent.Semaphore
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._

Expand All @@ -28,29 +29,40 @@ import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{ResetSystemProperties, ThreadUtils}
import org.apache.spark.util.{ResetSystemProperties, SystemClock, ThreadUtils}

class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalSparkContext
with ResetSystemProperties with Eventually {

val numExecs = 3
val numParts = 3
val TaskStarted = "TASK_STARTED"
val TaskEnded = "TASK_ENDED"
val JobEnded = "JOB_ENDED"

test(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor") {
runDecomTest(true, false, true)
s"on a decommissioned executor after task start") {
runDecomTest(true, false, TaskStarted)
}

test(s"verify that shuffle blocks are migrated") {
runDecomTest(false, true, false)
test(s"verify that an already running task which is going to cache data succeeds " +
s"on a decommissioned executor after one task ends but before job ends") {
runDecomTest(true, false, TaskEnded)
}

test(s"verify that both migrations can work at the same time.") {
runDecomTest(true, true, false)
test(s"verify that shuffle blocks are migrated") {
runDecomTest(false, true, JobEnded)
}

private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = {
test(s"verify that both migrations can work at the same time") {
runDecomTest(true, true, JobEnded)
}

private def runDecomTest(
persist: Boolean,
shuffle: Boolean,
whenToDecom: String): Unit = {
val migrateDuring = whenToDecom != JobEnded
val master = s"local-cluster[${numExecs}, 1, 1024]"
val conf = new SparkConf().setAppName("test").setMaster(master)
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
Expand All @@ -61,6 +73,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// workload we need to worry about.
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)

if (whenToDecom == TaskStarted) {
// We are using accumulators below, make sure those are reported frequently.
conf.set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "10ms")
}
sc = new SparkContext(master, "test", conf)

// Wait for the executors to start
Expand All @@ -70,15 +86,29 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS

val input = sc.parallelize(1 to numParts, numParts)
val accum = sc.longAccumulator("mapperRunAccumulator")
input.count()

val sleepIntervalMs = whenToDecom match {
// Increase the window of time b/w task started and ended so that we can decom within that.
case TaskStarted => 2000
// Make one task take a really short time so that we can decommission right after it is
// done but before its peers are done.
case TaskEnded =>
if (TaskContext.getPartitionId() == 0) {
100
} else {
1000
}
// No sleep otherwise
case _ => 0
}

// Create a new RDD where we have sleep in each partition, we are also increasing
// the value of accumulator in each partition
val baseRdd = input.mapPartitions { x =>
if (migrateDuring) {
Thread.sleep(1000)
}
accum.add(1)
if (sleepIntervalMs > 0) {
Thread.sleep(sleepIntervalMs)
}
x.map(y => (y, y))
}
val testRdd = shuffle match {
Expand All @@ -87,35 +117,46 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
}

// Listen for the job & block updates
val taskStartSem = new Semaphore(0)
val broadcastSem = new Semaphore(0)
val executorRemovedSem = new Semaphore(0)
val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]()
val executorsActuallyStarted = new ConcurrentHashMap[String, Boolean]()
val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated]
sc.addSparkListener(new SparkListener {

def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) {
executorsActuallyStarted.keySet().asScala.headOption
} else {
taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption
}

sc.addSparkListener(new SparkListener {
override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = {
executorRemovedSem.release()
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStartSem.release()
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
taskEndEvents.append(taskEnd)
taskEndEvents.add(taskEnd)
}

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = {
// Once broadcast start landing on the executors we're good to proceed.
// We don't only use task start as it can occur before the work is on the executor.
if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
broadcastSem.release()
}
blocksUpdated.append(blockUpdated)
}
})

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
val executorId = executorMetricsUpdate.execId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
val validUpdate = executorMetricsUpdate
.accumUpdates
.flatMap(_._4)
.exists { accumInfo =>
accumInfo.name == accum.name && accumInfo.update.exists(_.asInstanceOf[Long] >= 1)
}
if (validUpdate) {
executorsActuallyStarted.put(executorId, java.lang.Boolean.TRUE)
}
}
}
})

// Cache the RDD lazily
if (persist) {
Expand All @@ -125,28 +166,32 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// Start the computation of RDD - this step will also cache the RDD
val asyncCount = testRdd.countAsync()

// Wait for the job to have started.
taskStartSem.acquire(1)
// Wait for each executor + driver to have it's broadcast info delivered.
broadcastSem.acquire((numExecs + 1))

// Make sure the job is either mid run or otherwise has data to migrate.
if (migrateDuring) {
// Give Spark a tiny bit to start executing after the broadcast blocks land.
// For me this works at 100, set to 300 for system variance.
Thread.sleep(300)
// Wait for one of the tasks to succeed and finish writing its blocks.
// This way we know that this executor had real data to migrate when it is subsequently
// decommissioned below.
Comment on lines +171 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This goes against the purpose of this test: making sure that an executor with a running task that receives a decom has the block migrated. It is not migrating during decommissioning in the same way.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case, I don't follow how this is supposed to work in the first place. Consider the root cause: You decommission an executor while a (result) task is running. The block manager associated with the executor enters the decom state. The task tries to write the RDD persisted block. The block manager fails that because it is decom'd, thereby failing the task. The task reruns somewhere else and the job succeeds. Now you go to check if that executor has migrated the block: That check fails because the block was never written in the first place.

So I am relaxing the notion of "migrate during" to mean "migrate while the job is running". The question is "which executor" do you want to decommission and force a migration off ? Picking a wrong executor as above can mean that no migrations indeed happen because no real blocks were written (or written and thence discarded).

If that's the intent of the test then I think we need to change the block manager decommissioning (production) code to realize the intent.

Without this PR: This test is fundamentally racy and thus provides low signal. The race is between the task's end and the block manager's decommission: If the task ends successfully before the decom, the test passes. Otherwise the test fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think another way we could make sure the test covers what we want is to run a job repeatedly until all 3 executors come up.

The block manager (in decom state) does indeed refuses puts, but RDD computation on the executor goes through getOrElseUpdate which immediately calls doPutIterator if there is not a cache hit before the iterator starts being computed. Since the check to see if the block manager is decommissioning occurs before the start of the computation, not at the end we want to ensure that block can be put (and then later migrated).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a comment where we do the isDecommissioning check to explain that it is intentionally done there so that we don't reject blocks which have already started computation. Do you think that would help?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The block manager (in decom state) does indeed refuses puts, but RDD computation on the executor goes through getOrElseUpdate which immediately calls doPutIterator if there is not a cache hit before the iterator starts being computed.

I don't see this. This is what I see: (BM stands for BlockManager)

  • RDD.getOrCompute -> BM.getOrElseUpdate -> BM.doPutIterator -> BM.doPut

BM.doPut throws BlockSavedOnDecommissionedBlockManagerException if isDecommissioning. And this fails the job. I am not sure what it should do instead off the top of my head since I am new to this codepath. But it is certainly not continuing on with the computation (as far as I can tell).

If the intent of the test is indeed to test decommissioning "Before the task has ended", I think then we need another BlockManager PR to actually realize it :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe we need another block manager PR to realize it, I think this is just a flaky test because we took out the original sleep and tried to use TestUtils which doesn't do a good enough job of waiting for the executor to fully come up.

Since doPut is called before the task starts computation, we don't throw away any of the in-progress data.

I'll make an alternate PR to this one to illustrate my understanding and hopefully we can iron it out and make the code path clear for everyone :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool ! Looking forward to your alternate PR to fix this.

I don't believe we need another block manager PR to realize it, I think this is just a flaky test because we took out the original sleep and tried to use TestUtils which doesn't do a good enough job of waiting for the executor to fully come up.

I don't think the issue is that the executor does not come up. It does come up. The issue I think is that it tries to run the task and fails because it is decommissioned.

Since doPut is called before the task starts computation, we don't throw away any of the in-progress data.

I agree that doPut is called before the task starts computation. I didn't follow what you mean by "in-progress data" ?. Per my understanding, the task is simply failing before the iterator is even created.

Please check this for yourself by doing a decommission in the onTaskStart listener callback and ensuring that the task has a long enough sleep time to ensure that it waits for this decommissioning to happen.

I'll make an alternate PR to this one to illustrate my understanding and hopefully we can iron it out and make the code path clear for everyone :)

Hurray :-)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk, Please take a look at the PR once again. I have added another test to specifically capture the intent of decommissioning after the task has started but before the task has ended.

val intervalMs = if (whenToDecom == TaskStarted) {
3.milliseconds
} else {
10.milliseconds
}
eventually(timeout(6.seconds), interval(intervalMs)) {
assert(getCandidateExecutorToDecom.isDefined)
}
} else {
ThreadUtils.awaitResult(asyncCount, 15.seconds)
}

// Decommission one of the executors.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}")

val execToDecommission = execs.head
logDebug(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false))
val execToDecommission = getCandidateExecutorToDecom.get
logInfo(s"Decommissioning executor ${execToDecommission}")
sched.decommissionExecutor(
execToDecommission,
ExecutorDecommissionInfo("", isHostDecommissioned = false))
val decomTime = new SystemClock().getTimeMillis()

// Wait for job to finish.
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)
Expand All @@ -155,16 +200,31 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
assert(accum.value === numParts)

sc.listenerBus.waitUntilEmpty()
val taskEndEventsCopy = taskEndEvents.asScala
if (shuffle) {
// mappers & reducers which succeeded
assert(taskEndEvents.count(_.reason == Success) === 2 * numParts,
assert(taskEndEventsCopy.count(_.reason == Success) === 2 * numParts,
s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
} else {
// only mappers which executed successfully
assert(taskEndEvents.count(_.reason == Success) === numParts,
assert(taskEndEventsCopy.count(_.reason == Success) === numParts,
s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})")
}

val minTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).min
val maxTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).max

// Verify that the decom time matched our expectations
val decomAssertMsg = s"$whenToDecom: decomTime: $decomTime, minTaskEnd: $minTaskEndTime," +
s" maxTaskEnd: $maxTaskEndTime"
assert(minTaskEndTime <= maxTaskEndTime, decomAssertMsg)
whenToDecom match {
case TaskStarted => assert(minTaskEndTime > decomTime, decomAssertMsg)
case TaskEnded => assert(minTaskEndTime <= decomTime &&
decomTime < maxTaskEndTime, decomAssertMsg)
case JobEnded => assert(maxTaskEndTime <= decomTime, decomAssertMsg)
}

// Wait for our respective blocks to have migrated
eventually(timeout(30.seconds), interval(10.milliseconds)) {
if (persist) {
Expand Down Expand Up @@ -224,6 +284,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
// should have same value like before
assert(testRdd.count() === numParts)
assert(accum.value === numParts)

}
}