diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c8c6e5a192a2..b7a64d75a8d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -165,8 +165,6 @@ private[deploy] object DeployMessages { case object ReregisterWithMaster // used when a worker attempts to reconnect to a master - case object DecommissionSelf // Mark as decommissioned. May be Master to Worker in the future. - // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index aa8c46fc6831..862e685c2dce 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -668,7 +668,7 @@ private[deploy] class Worker( finishedApps += id maybeCleanupApplication(id) - case DecommissionSelf => + case WorkerDecommission(_, _) => decommissionSelf() } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index def125bb6bfb..55fb76b3572a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null - @volatile private var decommissioned = false @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need @@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend( */ private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]] + @volatile private var decommissioned = false + override def onStart(): Unit = { logInfo("Registering PWR handler.") SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + @@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend( case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + + case DecommissionSelf => + logInfo("Received decommission self") + decommissionSelf() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { @@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend( if (executor != null) { executor.decommission() } - logInfo("Done decommissioning self.") + // Shutdown the executor once all tasks are gone & any configured migrations completed. + // Detecting migrations completion doesn't need to be perfect and we want to minimize the + // overhead for executors that are not in decommissioning state as overall that will be + // more of the executors. For example, this will not catch a block which is already in + // the process of being put from a remote executor before migration starts. This trade-off + // is viewed as acceptable to minimize introduction of any new locking structures in critical + // code paths. + + val shutdownThread = new Thread("wait-for-blocks-to-migrate") { + override def run(): Unit = { + var lastTaskRunningTime = System.nanoTime() + val sleep_time = 1000 // 1s + + while (true) { + logInfo("Checking to see if we can shutdown.") + Thread.sleep(sleep_time) + if (executor == null || executor.numRunningTasks == 0) { + if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { + logInfo("No running tasks, checking migrations") + val (migrationTime, allBlocksMigrated) = env.blockManager.lastMigrationInfo() + // We can only trust allBlocksMigrated boolean value if there were no tasks running + // since the start of computing it. + if (allBlocksMigrated && (migrationTime > lastTaskRunningTime)) { + logInfo("No running tasks, all blocks migrated, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } else { + logInfo("All blocks not yet migrated.") + } + } else { + logInfo("No running tasks, no block migration configured, stopping.") + exitExecutor(0, "Finished decommissioning", notifyDriver = true) + } + } else { + logInfo("Blocked from shutdown by running ${executor.numRunningtasks} tasks") + // If there is a running task it could store blocks, so make sure we wait for a + // migration loop to complete after the last task is done. + // Note: this is only advanced if there is a running task, if there + // is no running task but the blocks are not done migrating this does not + // move forward. + lastTaskRunningTime = System.nanoTime() + } + } + } + } + shutdownThread.setDaemon(true) + shutdownThread.start() + + logInfo("Will exit when finished decommissioning") // Return true since we are handling a signal true } catch { case e: Exception => - logError(s"Error ${e} during attempt to decommission self") + logError("Unexpected error while decommissioning self", e) false } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 91485f01bf00..7242ab778606 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages { // The message to check if `CoarseGrainedSchedulerBackend` thinks the executor is alive or not. case class IsExecutorAlive(executorId: String) extends CoarseGrainedClusterMessage + + // Used to ask an executor to decommission itself. (Can be an internal message) + case object DecommissionSelf extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8fbefae58af1..d81a617d0ed7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) } + // Send decommission message to the executor, this may be a duplicate since the executor + // could have been the one to notify us. But it's also possible the notification came from + // elsewhere and the executor does not yet know. + executorDataMap.get(executorId) match { + case Some(executorInfo) => + executorInfo.executorEndpoint.send(DecommissionSelf) + case None => + // Ignoring the executor since it is not registered. + logWarning(s"Attempted to decommission unknown executor $executorId.") + } logInfo(s"Finished decommissioning executor $executorId.") if (conf.get(STORAGE_DECOMMISSION_ENABLED)) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 47af854b6e8f..6ec93df67f7d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1822,6 +1822,14 @@ private[spark] class BlockManager( } } + /* + * Returns the last migration time and a boolean denoting if all the blocks have been migrated. + * If there are any tasks running since that time the boolean may be incorrect. + */ + private[spark] def lastMigrationInfo(): (Long, Boolean) = { + decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false)) + } + private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] = master.getReplicateInfoForRDDBlocks(blockManagerId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 1cc7ef6a25f9..f0a8e47aa320 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.util.concurrent.ExecutorService +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner( private val maxReplicationFailuresForDecommission = conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK) + // Used for tracking if our migrations are complete. Readable for testing + @volatile private[storage] var lastRDDMigrationTime: Long = 0 + @volatile private[storage] var lastShuffleMigrationTime: Long = 0 + @volatile private[storage] var rddBlocksLeft: Boolean = true + @volatile private[storage] var shuffleBlocksLeft: Boolean = true + /** * This runnable consumes any shuffle blocks in the queue for migration. This part of a * producer/consumer where the main migration loop updates the queue of blocks to be migrated @@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner( null)// class tag, we don't need for shuffle logDebug(s"Migrated sub block ${blockId}") } - logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}") + logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}") } else { logError(s"Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}") } + numMigratedShuffles.incrementAndGet() } } // This catch is intentionally outside of the while running block. @@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner( // Shuffles which are either in queue for migrations or migrated private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]() + // Shuffles which have migrated. This used to know when we are "done", being done can change + // if a new shuffle file is created by a running task. + private val numMigratedShuffles = new AtomicInteger(0) + // Shuffles which are queued for migration & number of retries so far. + // Visible in storage for testing. private[storage] val shufflesToMigrate = new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]() // Set if we encounter an error attempting to migrate and stop. @volatile private var stopped = false + @volatile private var stoppedRDD = + !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED) + @volatile private var stoppedShuffle = + !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED) private val migrationPeers = mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]() @@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner( override def run(): Unit = { assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedRDD && !Thread.interrupted()) { logInfo("Iterating on migrating from the block manager.") + // Validate we have peers to migrate to. + val peers = bm.getPeers(false) + // If we have no peers give up. + if (peers.isEmpty) { + stopped = true + stoppedRDD = true + } try { + val startTime = System.nanoTime() logDebug("Attempting to replicate all cached RDD blocks") - decommissionRddCacheBlocks() + rddBlocksLeft = decommissionRddCacheBlocks() + lastRDDMigrationTime = startTime logInfo("Attempt to replicate all cached blocks done") logInfo(s"Waiting for ${sleepInterval} before refreshing migrations.") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => - logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + logInfo("Interrupted during RDD migration, stopping") + stoppedRDD = true case NonFatal(e) => - logError("Error occurred while trying to replicate for block manager decommissioning.", + logError("Error occurred replicating RDD for block manager decommissioning.", e) - stopped = true + stoppedRDD = true } } } @@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner( override def run() { assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) - while (!stopped && !Thread.interrupted()) { + while (!stopped && !stoppedShuffle && !Thread.interrupted()) { try { logDebug("Attempting to replicate all shuffle blocks") - refreshOffloadingShuffleBlocks() + val startTime = System.nanoTime() + shuffleBlocksLeft = refreshOffloadingShuffleBlocks() + lastShuffleMigrationTime = startTime logInfo("Done starting workers to migrate shuffle blocks") Thread.sleep(sleepInterval) } catch { case e: InterruptedException => logInfo("Interrupted during migration, will not refresh migrations.") - stopped = true + stoppedShuffle = true case NonFatal(e) => logError("Error occurred while trying to replicate for block manager decommissioning.", e) - stopped = true + stoppedShuffle = true } } } @@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner( * but rather shadows them. * Requires an Indexed based shuffle resolver. * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage. + * Returns true if we are not done migrating shuffle blocks. */ - private[storage] def refreshOffloadingShuffleBlocks(): Unit = { + private[storage] def refreshOffloadingShuffleBlocks(): Boolean = { // Update the queue of shuffles to be migrated logInfo("Offloading shuffle blocks") val localShuffles = bm.migratableResolver.getStoredShuffles().toSet @@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner( deadPeers.foreach { peer => migrationPeers.get(peer).foreach(_.running = false) } + // If we don't have anyone to migrate to give up + if (migrationPeers.values.find(_.running == true).isEmpty) { + stoppedShuffle = true + } + // If we found any new shuffles to migrate or otherwise have not migrated everything. + newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get() } /** @@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner( /** * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers * Visible for testing + * Returns true if we have not migrated all of our RDD blocks. */ - private[storage] def decommissionRddCacheBlocks(): Unit = { + private[storage] def decommissionRddCacheBlocks(): Boolean = { val replicateBlocksInfo = bm.getMigratableRDDBlocks() + // Refresh peers and validate we have somewhere to move blocks. if (replicateBlocksInfo.nonEmpty) { logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " + "for block manager decommissioning") } else { logWarning(s"Asked to decommission RDD cache blocks, but no blocks to migrate") - return + return false } // TODO: We can sort these blocks based on some policy (LRU/blockSize etc) @@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner( if (blocksFailedReplication.nonEmpty) { logWarning("Blocks failed replication in cache decommissioning " + s"process: ${blocksFailedReplication.mkString(",")}") + return true } + return false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { @@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner( } logInfo("Stopped storage decommissioner") } + + /* + * Returns the last migration time and a boolean for if all blocks have been migrated. + * The last migration time is calculated to be the minimum of the last migration of any + * running migration (and if there are now current running migrations it is set to current). + * This provides a timeStamp which, if there have been no tasks running since that time + * we can know that all potential blocks that can be have been migrated off. + */ + private[storage] def lastMigrationInfo(): (Long, Boolean) = { + if (stopped || (stoppedRDD && stoppedShuffle)) { + // Since we don't have anything left to migrate ever (since we don't restart once + // stopped), return that we're done with a validity timestamp that doesn't expire. + (Long.MaxValue, true) + } else { + // Chose the min of the active times. See the function description for more information. + val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) { + Math.min(lastRDDMigrationTime, lastShuffleMigrationTime) + } else if (!stoppedShuffle) { + lastShuffleMigrationTime + } else { + lastRDDMigrationTime + } + + // Technically we could have blocks left if we encountered an error, but those blocks will + // never be migrated, so we don't care about them. + val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && (!rddBlocksLeft || stoppedRDD) + (lastMigrationTime, blocksMigrated) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 3c34070e8bb9..bb0c33acc0af 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { assert(sleepyRdd.count() === 10) } - test("verify a task with all workers decommissioned succeeds") { + test("verify a running task with all workers decommissioned succeeds") { + // Wait for the executors to come up + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 30000) // 30s + val input = sc.parallelize(1 to 10) // Listen for the job val sem = new Semaphore(0) @@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) - TestUtils.waitUntilExecutorsUp(sc = sc, - numExecutors = 2, - timeout = 30000) // 30s + val sleepyRdd = input.mapPartitions{ x => Thread.sleep(5000) // 5s x @@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) - // Try and launch task after decommissioning, this should fail - val postDecommissioned = input.map(x => x) - val postDecomAsyncCount = postDecommissioned.countAsync() - val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) - } - assert(postDecomAsyncCount.isCompleted === false, - "After exec decommission new task could not launch") } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 6a52f72938c6..25145dac5268 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore} +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Semaphore, TimeUnit} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -69,9 +69,9 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS .set(config.STORAGE_DECOMMISSION_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle) - // Just replicate blocks as fast as we can during testing, there isn't another + // Just replicate blocks quickly during testing, there isn't another // workload we need to worry about. - .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) if (whenToDecom == TaskStarted) { // We are using accumulators below, make sure those are reported frequently. @@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execIdToBlocksMapping = storageStatus.map( status => (status.blockManagerId.executorId, status.blocks)).toMap // No cached blocks should be present on executor which was decommissioned - assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), + assert( + !execIdToBlocksMapping.contains(execToDecommission) || + execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq(), "Cache blocks should be migrated") if (persist) { // There should still be all the RDD blocks cached assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts) } - // Make the executor we decommissioned exit - sched.client.killExecutors(List(execToDecommission)) - - // Wait for the executor to be removed - executorRemovedSem.acquire(1) + // Wait for the executor to be removed automatically after migration. + assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES)) // Since the RDD is cached or shuffled so further usage of same RDD should use the // cached data. Original RDD partitions should not be recomputed i.e. accum diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala index 41b68d5978d1..74ad8bd2bcf9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.duration._ import org.mockito.{ArgumentMatchers => mc} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{atLeast => least, mock, times, verify, when} import org.scalatest.concurrent.Eventually._ import org.scalatest.matchers.must.Matchers @@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { private val sparkConf = new SparkConf(false) .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true) .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true) + // Just replicate blocks quickly during testing, as there isn't another + // workload we need to worry about. + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) private def registerShuffleBlocks( mockMigratableShuffleResolver: MigratableResolver, @@ -54,6 +57,113 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { } } + /** + * Validate a given configuration with the mocks. + * The fail variable controls if we expect migration to fail, in which case we expect + * a constant Long.MaxValue timestamp. + */ + private def validateDecommissionTimestamps(conf: SparkConf, bm: BlockManager, + migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = false) = { + // Verify the decommissioning manager timestamps and status + val bmDecomManager = new BlockManagerDecommissioner(conf, bm) + var previousTime: Option[Long] = None + try { + bmDecomManager.start() + eventually(timeout(100.second), interval(10.milliseconds)) { + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done) + // Make sure the time stamp starts moving forward. + if (!fail) { + previousTime match { + case None => + previousTime = Some(currentTime) + assert(false) + case Some(t) => + assert(t < currentTime) + } + } else { + // If we expect migration to fail we should get the max value quickly. + assert(currentTime === Long.MaxValue) + } + } + if (!fail) { + // Wait 5 seconds and assert times keep moving forward. + Thread.sleep(5000) + val (currentTime, done) = bmDecomManager.lastMigrationInfo() + assert(done && currentTime > previousTime.get) + } + } finally { + bmDecomManager.stop() + } + } + + test("test that with no blocks we finish migration") { + // Set up the mocks so we return empty + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + when(migratableShuffleBlockResolver.getStoredShuffles()) + .thenReturn(Seq()) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) + } + + test("block decom manager with no migrations configured") { + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + val badConf = new SparkConf(false) + .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false) + .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L) + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver, + fail = true) + } + + test("block decom manager with no peers") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq()) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver, + fail = true) + } + + + test("block decom manager with only shuffle files time moves forward") { + // Set up the mocks so we return one shuffle block + val bm = mock(classOf[BlockManager]) + val migratableShuffleBlockResolver = mock(classOf[MigratableResolver]) + registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1))) + when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver) + when(bm.getMigratableRDDBlocks()) + .thenReturn(Seq()) + when(bm.getPeers(mc.any())) + .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345))) + + // Verify the decom manager handles this correctly + validateDecommissionTimestamps(sparkConf, bm, migratableShuffleBlockResolver) + } + test("test shuffle and cached rdd migration without any error") { val blockTransferService = mock(classOf[BlockTransferService]) val bm = mock(classOf[BlockManager]) @@ -77,13 +187,36 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers { try { bmDecomManager.start() - eventually(timeout(5.second), interval(10.milliseconds)) { + var previousRDDTime: Option[Long] = None + var previousShuffleTime: Option[Long] = None + + // We don't check that all blocks are migrated because out mock is always returning an RDD. + eventually(timeout(100.second), interval(10.milliseconds)) { assert(bmDecomManager.shufflesToMigrate.isEmpty == true) - verify(bm, times(1)).replicateBlock( + verify(bm, least(1)).replicateBlock( mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3))) verify(blockTransferService, times(2)) .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), mc.any(), mc.any(), mc.eq(StorageLevel.DISK_ONLY), mc.isNull()) + // Since we never "finish" the RDD blocks, make sure the time is always moving forward. + assert(bmDecomManager.rddBlocksLeft) + previousRDDTime match { + case None => + previousRDDTime = Some(bmDecomManager.lastRDDMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastRDDMigrationTime > t) + } + // Since we do eventually finish the shuffle blocks make sure the shuffle blocks complete + // and that the time keeps moving forward. + assert(!bmDecomManager.shuffleBlocksLeft) + previousShuffleTime match { + case None => + previousShuffleTime = Some(bmDecomManager.lastShuffleMigrationTime) + assert(false) + case Some(t) => + assert(bmDecomManager.lastShuffleMigrationTime > t) + } } } finally { bmDecomManager.stop()