@@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
2828 */
2929@ DeveloperApi
3030class StorageStatusListener extends SparkListener {
31- private val executorIdToStorageStatus = mutable.Map [String , StorageStatus ]()
31+ // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
32+ private [storage] val executorIdToStorageStatus = mutable.Map [String , StorageStatus ]()
3233
3334 def storageStatusList = executorIdToStorageStatus.values.toSeq
3435
3536 /** Update storage status list to reflect updated block statuses */
36- def updateStorageStatus (execId : String , updatedBlocks : Seq [(BlockId , BlockStatus )]) {
37- val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
37+ private def updateStorageStatus (execId : String , updatedBlocks : Seq [(BlockId , BlockStatus )]) {
38+ val filteredStatus = executorIdToStorageStatus.get( execId)
3839 filteredStatus.foreach { storageStatus =>
3940 updatedBlocks.foreach { case (blockId, updatedStatus) =>
40- storageStatus.blocks(blockId) = updatedStatus
41+ if (updatedStatus.storageLevel == StorageLevel .NONE ) {
42+ storageStatus.blocks.remove(blockId)
43+ } else {
44+ storageStatus.blocks(blockId) = updatedStatus
45+ }
4146 }
4247 }
4348 }
4449
4550 /** Update storage status list to reflect the removal of an RDD from the cache */
46- def updateStorageStatus (unpersistedRDDId : Int ) {
51+ private def updateStorageStatus (unpersistedRDDId : Int ) {
4752 storageStatusList.foreach { storageStatus =>
4853 val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
4954 unpersistedBlocksIds.foreach { blockId =>
50- storageStatus.blocks(blockId) = BlockStatus ( StorageLevel . NONE , 0L , 0L , 0L )
55+ storageStatus.blocks.remove (blockId)
5156 }
5257 }
5358 }
0 commit comments