diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 57410103dd08..6a52f72938c6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.storage -import java.util.concurrent.Semaphore +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore} +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ @@ -28,29 +29,40 @@ import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend -import org.apache.spark.util.{ResetSystemProperties, ThreadUtils} +import org.apache.spark.util.{ResetSystemProperties, SystemClock, ThreadUtils} class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties with Eventually { val numExecs = 3 val numParts = 3 + val TaskStarted = "TASK_STARTED" + val TaskEnded = "TASK_ENDED" + val JobEnded = "JOB_ENDED" test(s"verify that an already running task which is going to cache data succeeds " + - s"on a decommissioned executor") { - runDecomTest(true, false, true) + s"on a decommissioned executor after task start") { + runDecomTest(true, false, TaskStarted) } - test(s"verify that shuffle blocks are migrated") { - runDecomTest(false, true, false) + test(s"verify that an already running task which is going to cache data succeeds " + + s"on a decommissioned executor after one task ends but before job ends") { + runDecomTest(true, false, TaskEnded) } - test(s"verify that both migrations can work at the same time.") { - runDecomTest(true, true, false) + test(s"verify that shuffle blocks are migrated") { + runDecomTest(false, true, JobEnded) } - private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = { + test(s"verify that both migrations can work at the same time") { + runDecomTest(true, true, JobEnded) + } + private def runDecomTest( + persist: Boolean, + shuffle: Boolean, + whenToDecom: String): Unit = { + val migrateDuring = whenToDecom != JobEnded val master = s"local-cluster[${numExecs}, 1, 1024]" val conf = new SparkConf().setAppName("test").setMaster(master) .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true) @@ -61,6 +73,10 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // workload we need to worry about. .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + if (whenToDecom == TaskStarted) { + // We are using accumulators below, make sure those are reported frequently. + conf.set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "10ms") + } sc = new SparkContext(master, "test", conf) // Wait for the executors to start @@ -70,15 +86,29 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val input = sc.parallelize(1 to numParts, numParts) val accum = sc.longAccumulator("mapperRunAccumulator") - input.count() + + val sleepIntervalMs = whenToDecom match { + // Increase the window of time b/w task started and ended so that we can decom within that. + case TaskStarted => 2000 + // Make one task take a really short time so that we can decommission right after it is + // done but before its peers are done. + case TaskEnded => + if (TaskContext.getPartitionId() == 0) { + 100 + } else { + 1000 + } + // No sleep otherwise + case _ => 0 + } // Create a new RDD where we have sleep in each partition, we are also increasing // the value of accumulator in each partition val baseRdd = input.mapPartitions { x => - if (migrateDuring) { - Thread.sleep(1000) - } accum.add(1) + if (sleepIntervalMs > 0) { + Thread.sleep(sleepIntervalMs) + } x.map(y => (y, y)) } val testRdd = shuffle match { @@ -87,35 +117,46 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } // Listen for the job & block updates - val taskStartSem = new Semaphore(0) - val broadcastSem = new Semaphore(0) val executorRemovedSem = new Semaphore(0) - val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd] + val taskEndEvents = new ConcurrentLinkedQueue[SparkListenerTaskEnd]() + val executorsActuallyStarted = new ConcurrentHashMap[String, Boolean]() val blocksUpdated = ArrayBuffer.empty[SparkListenerBlockUpdated] - sc.addSparkListener(new SparkListener { + def getCandidateExecutorToDecom: Option[String] = if (whenToDecom == TaskStarted) { + executorsActuallyStarted.keySet().asScala.headOption + } else { + taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption + } + + sc.addSparkListener(new SparkListener { override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { executorRemovedSem.release() } - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - taskStartSem.release() - } - override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - taskEndEvents.append(taskEnd) + taskEndEvents.add(taskEnd) } override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { - // Once broadcast start landing on the executors we're good to proceed. - // We don't only use task start as it can occur before the work is on the executor. - if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) { - broadcastSem.release() - } blocksUpdated.append(blockUpdated) } - }) + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { + val executorId = executorMetricsUpdate.execId + if (executorId != SparkContext.DRIVER_IDENTIFIER) { + val validUpdate = executorMetricsUpdate + .accumUpdates + .flatMap(_._4) + .exists { accumInfo => + accumInfo.name == accum.name && accumInfo.update.exists(_.asInstanceOf[Long] >= 1) + } + if (validUpdate) { + executorsActuallyStarted.put(executorId, java.lang.Boolean.TRUE) + } + } + } + }) // Cache the RDD lazily if (persist) { @@ -125,28 +166,32 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Start the computation of RDD - this step will also cache the RDD val asyncCount = testRdd.countAsync() - // Wait for the job to have started. - taskStartSem.acquire(1) - // Wait for each executor + driver to have it's broadcast info delivered. - broadcastSem.acquire((numExecs + 1)) - // Make sure the job is either mid run or otherwise has data to migrate. if (migrateDuring) { - // Give Spark a tiny bit to start executing after the broadcast blocks land. - // For me this works at 100, set to 300 for system variance. - Thread.sleep(300) + // Wait for one of the tasks to succeed and finish writing its blocks. + // This way we know that this executor had real data to migrate when it is subsequently + // decommissioned below. + val intervalMs = if (whenToDecom == TaskStarted) { + 3.milliseconds + } else { + 10.milliseconds + } + eventually(timeout(6.seconds), interval(intervalMs)) { + assert(getCandidateExecutorToDecom.isDefined) + } } else { ThreadUtils.awaitResult(asyncCount, 15.seconds) } // Decommission one of the executors. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] - val execs = sched.getExecutorIds() - assert(execs.size == numExecs, s"Expected ${numExecs} executors but found ${execs.size}") - val execToDecommission = execs.head - logDebug(s"Decommissioning executor ${execToDecommission}") - sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false)) + val execToDecommission = getCandidateExecutorToDecom.get + logInfo(s"Decommissioning executor ${execToDecommission}") + sched.decommissionExecutor( + execToDecommission, + ExecutorDecommissionInfo("", isHostDecommissioned = false)) + val decomTime = new SystemClock().getTimeMillis() // Wait for job to finish. val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) @@ -155,16 +200,31 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS assert(accum.value === numParts) sc.listenerBus.waitUntilEmpty() + val taskEndEventsCopy = taskEndEvents.asScala if (shuffle) { // mappers & reducers which succeeded - assert(taskEndEvents.count(_.reason == Success) === 2 * numParts, + assert(taskEndEventsCopy.count(_.reason == Success) === 2 * numParts, s"Expected ${2 * numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } else { // only mappers which executed successfully - assert(taskEndEvents.count(_.reason == Success) === numParts, + assert(taskEndEventsCopy.count(_.reason == Success) === numParts, s"Expected ${numParts} tasks got ${taskEndEvents.size} (${taskEndEvents})") } + val minTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).min + val maxTaskEndTime = taskEndEventsCopy.map(_.taskInfo.finishTime).max + + // Verify that the decom time matched our expectations + val decomAssertMsg = s"$whenToDecom: decomTime: $decomTime, minTaskEnd: $minTaskEndTime," + + s" maxTaskEnd: $maxTaskEndTime" + assert(minTaskEndTime <= maxTaskEndTime, decomAssertMsg) + whenToDecom match { + case TaskStarted => assert(minTaskEndTime > decomTime, decomAssertMsg) + case TaskEnded => assert(minTaskEndTime <= decomTime && + decomTime < maxTaskEndTime, decomAssertMsg) + case JobEnded => assert(maxTaskEndTime <= decomTime, decomAssertMsg) + } + // Wait for our respective blocks to have migrated eventually(timeout(30.seconds), interval(10.milliseconds)) { if (persist) { @@ -224,6 +284,5 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) - } }