1818package org .apache .spark .storage
1919
2020import java .util .concurrent .ExecutorService
21+ import java .util .concurrent .atomic .AtomicInteger
2122
2223import scala .collection .JavaConverters ._
2324import scala .collection .mutable
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
4142 private val maxReplicationFailuresForDecommission =
4243 conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK )
4344
45+ // Used for tracking if our migrations are complete. Readable for testing
46+ @ volatile private [storage] var lastRDDMigrationTime : Long = 0
47+ @ volatile private [storage] var lastShuffleMigrationTime : Long = 0
48+ @ volatile private [storage] var rddBlocksLeft : Boolean = true
49+ @ volatile private [storage] var shuffleBlocksLeft : Boolean = true
50+
4451 /**
4552 * This runnable consumes any shuffle blocks in the queue for migration. This part of a
4653 * producer/consumer where the main migration loop updates the queue of blocks to be migrated
@@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner(
9198 null )// class tag, we don't need for shuffle
9299 logDebug(s " Migrated sub block ${blockId}" )
93100 }
94- logInfo (s " Migrated ${shuffleBlockInfo} to ${peer}" )
101+ logDebug (s " Migrated ${shuffleBlockInfo} to ${peer}" )
95102 } else {
96103 logError(s " Skipping block ${shuffleBlockInfo} because it has failed ${retryCount}" )
97104 }
105+ numMigratedShuffles.incrementAndGet()
98106 }
99107 }
100108 // This catch is intentionally outside of the while running block.
@@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner(
115123 // Shuffles which are either in queue for migrations or migrated
116124 private val migratingShuffles = mutable.HashSet [ShuffleBlockInfo ]()
117125
126+ // Shuffles which have migrated. This used to know when we are "done", being done can change
127+ // if a new shuffle file is created by a running task.
128+ private val numMigratedShuffles = new AtomicInteger (0 )
129+
118130 // Shuffles which are queued for migration & number of retries so far.
131+ // Visible in storage for testing.
119132 private [storage] val shufflesToMigrate =
120133 new java.util.concurrent.ConcurrentLinkedQueue [(ShuffleBlockInfo , Int )]()
121134
122135 // Set if we encounter an error attempting to migrate and stop.
123136 @ volatile private var stopped = false
137+ @ volatile private var stoppedRDD =
138+ ! conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED )
139+ @ volatile private var stoppedShuffle =
140+ ! conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED )
124141
125142 private val migrationPeers =
126143 mutable.HashMap [BlockManagerId , ShuffleMigrationRunnable ]()
@@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner(
133150
134151 override def run (): Unit = {
135152 assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED ))
136- while (! stopped && ! Thread .interrupted()) {
153+ while (! stopped && ! stoppedRDD && ! Thread .interrupted()) {
137154 logInfo(" Iterating on migrating from the block manager." )
155+ // Validate we have peers to migrate to.
156+ val peers = bm.getPeers(false )
157+ // If we have no peers give up.
158+ if (peers.isEmpty) {
159+ stopped = true
160+ stoppedRDD = true
161+ }
138162 try {
163+ val startTime = System .nanoTime()
139164 logDebug(" Attempting to replicate all cached RDD blocks" )
140- decommissionRddCacheBlocks()
165+ rddBlocksLeft = decommissionRddCacheBlocks()
166+ lastRDDMigrationTime = startTime
141167 logInfo(" Attempt to replicate all cached blocks done" )
142168 logInfo(s " Waiting for ${sleepInterval} before refreshing migrations. " )
143169 Thread .sleep(sleepInterval)
144170 } catch {
145171 case e : InterruptedException =>
146- logInfo(" Interrupted during migration, will not refresh migrations. " )
147- stopped = true
172+ logInfo(" Interrupted during RDD migration, stopping " )
173+ stoppedRDD = true
148174 case NonFatal (e) =>
149- logError(" Error occurred while trying to replicate for block manager decommissioning." ,
175+ logError(" Error occurred replicating RDD for block manager decommissioning." ,
150176 e)
151- stopped = true
177+ stoppedRDD = true
152178 }
153179 }
154180 }
@@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner(
162188
163189 override def run () {
164190 assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED ))
165- while (! stopped && ! Thread .interrupted()) {
191+ while (! stopped && ! stoppedShuffle && ! Thread .interrupted()) {
166192 try {
167193 logDebug(" Attempting to replicate all shuffle blocks" )
168- refreshOffloadingShuffleBlocks()
194+ val startTime = System .nanoTime()
195+ shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
196+ lastShuffleMigrationTime = startTime
169197 logInfo(" Done starting workers to migrate shuffle blocks" )
170198 Thread .sleep(sleepInterval)
171199 } catch {
172200 case e : InterruptedException =>
173201 logInfo(" Interrupted during migration, will not refresh migrations." )
174- stopped = true
202+ stoppedShuffle = true
175203 case NonFatal (e) =>
176204 logError(" Error occurred while trying to replicate for block manager decommissioning." ,
177205 e)
178- stopped = true
206+ stoppedShuffle = true
179207 }
180208 }
181209 }
@@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner(
191219 * but rather shadows them.
192220 * Requires an Indexed based shuffle resolver.
193221 * Note: if called in testing please call stopOffloadingShuffleBlocks to avoid thread leakage.
222+ * Returns true if we are not done migrating shuffle blocks.
194223 */
195- private [storage] def refreshOffloadingShuffleBlocks (): Unit = {
224+ private [storage] def refreshOffloadingShuffleBlocks (): Boolean = {
196225 // Update the queue of shuffles to be migrated
197226 logInfo(" Offloading shuffle blocks" )
198227 val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
@@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner(
215244 deadPeers.foreach { peer =>
216245 migrationPeers.get(peer).foreach(_.running = false )
217246 }
247+ // If we don't have anyone to migrate to give up
248+ if (migrationPeers.values.find(_.running == true ).isEmpty) {
249+ stoppedShuffle = true
250+ }
251+ // If we found any new shuffles to migrate or otherwise have not migrated everything.
252+ newShufflesToMigrate.nonEmpty || migratingShuffles.size < numMigratedShuffles.get()
218253 }
219254
220255 /**
@@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner(
231266 /**
232267 * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
233268 * Visible for testing
269+ * Returns true if we have not migrated all of our RDD blocks.
234270 */
235- private [storage] def decommissionRddCacheBlocks (): Unit = {
271+ private [storage] def decommissionRddCacheBlocks (): Boolean = {
236272 val replicateBlocksInfo = bm.getMigratableRDDBlocks()
273+ // Refresh peers and validate we have somewhere to move blocks.
237274
238275 if (replicateBlocksInfo.nonEmpty) {
239276 logInfo(s " Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
240277 " for block manager decommissioning" )
241278 } else {
242279 logWarning(s " Asked to decommission RDD cache blocks, but no blocks to migrate " )
243- return
280+ return false
244281 }
245282
246283 // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
@@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner(
252289 if (blocksFailedReplication.nonEmpty) {
253290 logWarning(" Blocks failed replication in cache decommissioning " +
254291 s " process: ${blocksFailedReplication.mkString(" ," )}" )
292+ return true
255293 }
294+ return false
256295 }
257296
258297 private def migrateBlock (blockToReplicate : ReplicateBlock ): Boolean = {
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
327366 }
328367 logInfo(" Stopped storage decommissioner" )
329368 }
369+
370+ /*
371+ * Returns the last migration time and a boolean for if all blocks have been migrated.
372+ * The last migration time is calculated to be the minimum of the last migration of any
373+ * running migration (and if there are now current running migrations it is set to current).
374+ * This provides a timeStamp which, if there have been no tasks running since that time
375+ * we can know that all potential blocks that can be have been migrated off.
376+ */
377+ private [storage] def lastMigrationInfo (): (Long , Boolean ) = {
378+ if (stopped || (stoppedRDD && stoppedShuffle)) {
379+ // Since we don't have anything left to migrate ever (since we don't restart once
380+ // stopped), return that we're done with a validity timestamp that doesn't expire.
381+ (Long .MaxValue , true )
382+ } else {
383+ // Chose the min of the active times. See the function description for more information.
384+ val lastMigrationTime = if (! stoppedRDD && ! stoppedShuffle) {
385+ Math .min(lastRDDMigrationTime, lastShuffleMigrationTime)
386+ } else if (! stoppedShuffle) {
387+ lastShuffleMigrationTime
388+ } else {
389+ lastRDDMigrationTime
390+ }
391+
392+ // Technically we could have blocks left if we encountered an error, but those blocks will
393+ // never be migrated, so we don't care about them.
394+ val blocksMigrated = (! shuffleBlocksLeft || stoppedShuffle) && (! rddBlocksLeft || stoppedRDD)
395+ (lastMigrationTime, blocksMigrated)
396+ }
397+ }
330398}
0 commit comments