diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 368835a86749..9ba21cfcde01 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,7 +48,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
-import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
+import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
@@ -843,7 +843,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
- StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
+ val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
+ StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
+ rddInfos.filter(_.isCached)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 94f5a4bb2e9c..bd31e3c5a187 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -267,9 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
private def storageStatus: Array[StorageStatus] = {
- blockManagerInfo.map { case(blockManagerId, info) =>
- val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
- new StorageStatus(blockManagerId, info.maxMem, blockMap)
+ blockManagerInfo.map { case (blockManagerId, info) =>
+ new StorageStatus(blockManagerId, info.maxMem, info.blocks)
}.toArray
}
@@ -424,7 +423,14 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
- tachyonSize: Long)
+ tachyonSize: Long) {
+ def isCached: Boolean = memSize + diskSize + tachyonSize > 0
+}
+
+@DeveloperApi
+object BlockStatus {
+ def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
+}
private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 687586490abf..e939318a029d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -30,7 +30,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
maxMem / 1024 / 1024
}
})
@@ -38,7 +38,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).sum
remainingMem / 1024 / 1024
}
})
@@ -46,8 +46,8 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
+ val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
}
})
@@ -55,11 +55,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList
- .flatMap(_.blocks.values.map(_.diskSize))
- .reduceOption(_ + _)
- .getOrElse(0L)
-
+ val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
diskSpaceUsed / 1024 / 1024
}
})
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 5a72e216872a..120c327a7e58 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -34,6 +34,8 @@ class RDDInfo(
var diskSize = 0L
var tachyonSize = 0L
+ def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
+
override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index 41c960c867e2..d9066f766476 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -35,13 +35,12 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
- val filteredStatus = executorIdToStorageStatus.get(execId)
- filteredStatus.foreach { storageStatus =>
+ executorIdToStorageStatus.get(execId).foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
if (updatedStatus.storageLevel == StorageLevel.NONE) {
- storageStatus.blocks.remove(blockId)
+ storageStatus.removeBlock(blockId)
} else {
- storageStatus.blocks(blockId) = updatedStatus
+ storageStatus.updateBlock(blockId, updatedStatus)
}
}
}
@@ -50,9 +49,8 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
- val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
- unpersistedBlocksIds.foreach { blockId =>
- storageStatus.blocks.remove(blockId)
+ storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
+ storageStatus.removeBlock(blockId)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 177281f66336..0a0a448baa2e 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -20,122 +20,258 @@ package org.apache.spark.storage
import scala.collection.Map
import scala.collection.mutable
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Storage information for each BlockManager.
+ *
+ * This class assumes BlockId and BlockStatus are immutable, such that the consumers of this
+ * class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
-class StorageStatus(
- val blockManagerId: BlockManagerId,
- val maxMem: Long,
- val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
+class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
- def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Internal representation of the blocks stored in this block manager.
+ *
+ * We store RDD blocks and non-RDD blocks separately to allow quick retrievals of RDD blocks.
+ * These collections should only be mutated through the add/update/removeBlock methods.
+ */
+ private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
+ private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
- def memUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Storage information of the blocks that entails memory, disk, and off-heap memory usage.
+ *
+ * As with the block maps, we store the storage information separately for RDD blocks and
+ * non-RDD blocks for the same reason. In particular, RDD storage information is stored
+ * in a map indexed by the RDD ID to the following 4-tuple:
+ *
+ * (memory size, disk size, off-heap size, storage level)
+ *
+ * We assume that all the blocks that belong to the same RDD have the same storage level.
+ * This field is not relevant to non-RDD blocks, however, so the storage information for
+ * non-RDD blocks contains only the first 3 fields (in the same order).
+ */
+ private val _rddStorageInfo = new mutable.HashMap[Int, (Long, Long, Long, StorageLevel)]
+ private var _nonRddStorageInfo: (Long, Long, Long) = (0L, 0L, 0L)
- def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ /** Create a storage status with an initial set of blocks, leaving the source unmodified. */
+ def this(bmid: BlockManagerId, maxMem: Long, initialBlocks: Map[BlockId, BlockStatus]) {
+ this(bmid, maxMem)
+ initialBlocks.foreach { case (bid, bstatus) => addBlock(bid, bstatus) }
+ }
- def diskUsedByRDD(rddId: Int) =
- rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
+ /**
+ * Return the blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the underlying maps and then
+ * concatenating them together. Much faster alternatives exist for common operations such as
+ * contains, get, and size.
+ */
+ def blocks: Map[BlockId, BlockStatus] = _nonRddBlocks ++ rddBlocks
- def memRemaining: Long = maxMem - memUsed
+ /**
+ * Return the RDD blocks stored in this block manager.
+ *
+ * Note that this is somewhat expensive, as it involves cloning the underlying maps and then
+ * concatenating them together. Much faster alternatives exist for common operations such as
+ * getting the memory, disk, and off-heap memory sizes occupied by this RDD.
+ */
+ def rddBlocks: Map[BlockId, BlockStatus] = _rddBlocks.flatMap { case (_, blocks) => blocks }
- def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
-}
+ /** Return the blocks that belong to the given RDD stored in this block manager. */
+ def rddBlocksById(rddId: Int): Map[BlockId, BlockStatus] = {
+ _rddBlocks.get(rddId).getOrElse(Map.empty)
+ }
-/** Helper methods for storage-related objects. */
-private[spark] object StorageUtils {
+ /** Add the given block to this storage status. If it already exists, overwrite it. */
+ private[spark] def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
+ updateStorageInfo(blockId, blockStatus)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus
+ case _ =>
+ _nonRddBlocks(blockId) = blockStatus
+ }
+ }
+
+ /** Update the given block in this storage status. If it doesn't already exist, add it. */
+ private[spark] def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
+ addBlock(blockId, blockStatus)
+ }
+
+ /** Remove the given block from this storage status. */
+ private[spark] def removeBlock(blockId: BlockId): Option[BlockStatus] = {
+ updateStorageInfo(blockId, BlockStatus.empty)
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ // Actually remove the block, if it exists
+ if (_rddBlocks.contains(rddId)) {
+ val removed = _rddBlocks(rddId).remove(blockId)
+ // If the given RDD has no more blocks left, remove the RDD
+ if (_rddBlocks(rddId).isEmpty) {
+ _rddBlocks.remove(rddId)
+ }
+ removed
+ } else {
+ None
+ }
+ case _ =>
+ _nonRddBlocks.remove(blockId)
+ }
+ }
/**
- * Returns basic information of all RDDs persisted in the given SparkContext. This does not
- * include storage information.
+ * Return whether the given block is stored in this block manager in O(1) time.
+ * Note that this is much faster than `this.blocks.contains`, which is O(blocks) time.
*/
- def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
- sc.persistentRdds.values.map { rdd =>
- val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
- val rddNumPartitions = rdd.partitions.size
- val rddStorageLevel = rdd.getStorageLevel
- val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel)
- rddInfo
- }.toArray
+ def containsBlock(blockId: BlockId): Boolean = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).exists(_.contains(blockId))
+ case _ =>
+ _nonRddBlocks.contains(blockId)
+ }
}
- /** Returns storage information of all RDDs persisted in the given SparkContext. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- sc: SparkContext): Array[RDDInfo] = {
- rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
+ /**
+ * Return the given block stored in this block manager in O(1) time.
+ * Note that this is much faster than `this.blocks.get`, which is O(blocks) time.
+ */
+ def getBlock(blockId: BlockId): Option[BlockStatus] = {
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddBlocks.get(rddId).map(_.get(blockId)).flatten
+ case _ =>
+ _nonRddBlocks.get(blockId)
+ }
}
- /** Returns storage information of all RDDs in the given list. */
- def rddInfoFromStorageStatus(
- storageStatuses: Seq[StorageStatus],
- rddInfos: Seq[RDDInfo],
- updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
-
- // Mapping from a block ID -> its status
- val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
-
- // Record updated blocks, if any
- updatedBlocks
- .collect { case (id: RDDBlockId, status) => (id, status) }
- .foreach { case (id, status) => blockMap(id) = status }
-
- // Mapping from RDD ID -> an array of associated BlockStatuses
- val rddBlockMap = blockMap
- .groupBy { case (k, _) => k.rddId }
- .mapValues(_.values.toArray)
-
- // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
- val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
-
- val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
- // Add up memory, disk and Tachyon sizes
- val persistedBlocks =
- blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
- val _storageLevel =
- if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
- val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
- val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
- val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
- rddInfoMap.get(rddId).map { rddInfo =>
- rddInfo.storageLevel = _storageLevel
- rddInfo.numCachedPartitions = persistedBlocks.length
- rddInfo.memSize = memSize
- rddInfo.diskSize = diskSize
- rddInfo.tachyonSize = tachyonSize
- rddInfo
- }
- }.toArray
+ /**
+ * Return the number of blocks stored in this block manager in O(RDDs) time.
+ * Note that this is much faster than `this.blocks.size`, which is O(blocks) time.
+ */
+ def numBlocks: Int = _nonRddBlocks.size + numRddBlocks
+
+ /**
+ * Return the number of RDD blocks stored in this block manager in O(RDDs) time.
+ * Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
+ */
+ def numRddBlocks: Int = _rddBlocks.values.map(_.size).sum
- scala.util.Sorting.quickSort(rddStorageInfos)
- rddStorageInfos
+ /**
+ * Return the number of blocks that belong to the given RDD in O(1) time.
+ * Note that this is much faster than `this.rddBlocksById(rddId).size`, which is
+ * O(blocks in this RDD) time.
+ */
+ def numRddBlocksById(rddId: Int): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0)
+
+ /** Return the memory remaining in this block manager. */
+ def memRemaining: Long = maxMem - memUsed
+
+ /** Return the memory used by this block manager. */
+ def memUsed: Long =
+ _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+
+ /** Return the disk space used by this block manager. */
+ def diskUsed: Long =
+ _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
+
+ /** Return the off-heap space used by this block manager. */
+ def offHeapUsed: Long =
+ _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
+
+ /** Return the memory used by the given RDD in this block manager in O(1) time. */
+ def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
+
+ /** Return the disk space used by the given RDD in this block manager in O(1) time. */
+ def diskUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L)
+
+ /** Return the off-heap space used by the given RDD in this block manager in O(1) time. */
+ def offHeapUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L)
+
+ /** Return the storage level, if any, used by the given RDD in this block manager. */
+ def rddStorageLevel(rddId: Int): Option[StorageLevel] = _rddStorageInfo.get(rddId).map(_._4)
+
+ /**
+ * Update the relevant storage info, taking into account any existing status for this block.
+ */
+ private def updateStorageInfo(blockId: BlockId, newBlockStatus: BlockStatus): Unit = {
+ val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus.empty)
+ val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
+ val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
+ val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
+ val level = newBlockStatus.storageLevel
+
+ // Compute new info from old info
+ val (oldMem, oldDisk, oldTachyon) = blockId match {
+ case RDDBlockId(rddId, _) =>
+ _rddStorageInfo.get(rddId)
+ .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
+ .getOrElse((0L, 0L, 0L))
+ case _ =>
+ _nonRddStorageInfo
+ }
+ val newMem = math.max(oldMem + changeInMem, 0L)
+ val newDisk = math.max(oldDisk + changeInDisk, 0L)
+ val newTachyon = math.max(oldTachyon + changeInTachyon, 0L)
+
+ // Set the correct info
+ blockId match {
+ case RDDBlockId(rddId, _) =>
+ // If this RDD is no longer persisted, remove it
+ if (newMem + newDisk + newTachyon == 0) {
+ _rddStorageInfo.remove(rddId)
+ } else {
+ _rddStorageInfo(rddId) = (newMem, newDisk, newTachyon, level)
+ }
+ case _ =>
+ _nonRddStorageInfo = (newMem, newDisk, newTachyon)
+ }
}
- /** Returns a mapping from BlockId to the locations of the associated block. */
- def blockLocationsFromStorageStatus(
- storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
- val blockLocationPairs = storageStatuses.flatMap { storageStatus =>
- storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) }
+}
+
+/** Helper methods for storage-related objects. */
+private[spark] object StorageUtils {
+
+ /**
+ * Update the given list of RDDInfo with the given list of storage statuses.
+ * This method overwrites the old values stored in the RDDInfo's.
+ */
+ def updateRddInfo(rddInfos: Seq[RDDInfo], statuses: Seq[StorageStatus]): Unit = {
+ rddInfos.foreach { rddInfo =>
+ val rddId = rddInfo.id
+ // Assume all blocks belonging to the same RDD have the same storage level
+ val storageLevel = statuses
+ .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
+ val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
+ val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
+ val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
+ val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
+
+ rddInfo.storageLevel = storageLevel
+ rddInfo.numCachedPartitions = numCachedPartitions
+ rddInfo.memSize = memSize
+ rddInfo.diskSize = diskSize
+ rddInfo.tachyonSize = tachyonSize
}
- blockLocationPairs.toMap
- .groupBy { case (blockId, _) => blockId }
- .mapValues(_.values.toSeq)
}
- /** Filters the given list of StorageStatus by the given RDD ID. */
- def filterStorageStatusByRDD(
- storageStatuses: Seq[StorageStatus],
- rddId: Int): Array[StorageStatus] = {
- storageStatuses.map { status =>
- val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq
- val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*)
- new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap)
- }.toArray
+ /**
+ * Return a mapping from block ID to its locations for each block that belongs to the given RDD.
+ */
+ def getRddBlockLocations(rddId: Int, statuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
+ val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
+ statuses.foreach { status =>
+ status.rddBlocksById(rddId).foreach { case (bid, _) =>
+ val location = status.blockManagerId.hostPort
+ blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
+ }
+ }
+ blockLocations
}
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index b358c855e1c8..b814b0e6b850 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -49,9 +49,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
- val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
- val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
+ val maxMem = storageStatusList.map(_.maxMem).sum
+ val memUsed = storageStatusList.map(_.memUsed).sum
+ val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
@@ -80,7 +80,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
- {execInfoSorted.map(execRow(_))}
+ {execInfoSorted.map(execRow)}
@@ -91,7 +91,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
Memory:
{Utils.bytesToString(memUsed)} Used
({Utils.bytesToString(maxMem)} Total)
- Disk: {Utils.bytesToString(diskSpaceUsed)} Used
+ Disk: {Utils.bytesToString(diskUsed)} Used
@@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
- val rddBlocks = status.blocks.size
+ val rddBlocks = status.numBlocks
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 2155633b8096..84ac53da4755 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers)
// Block table
- val filteredStorageStatusList = StorageUtils.filterStorageStatusByRDD(storageStatusList, rddId)
- val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name)
- val blockLocations = StorageUtils.blockLocationsFromStorageStatus(filteredStorageStatusList)
- val blocks = blockStatuses.map { case (blockId, status) =>
- (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
- }
+ val blockLocations = StorageUtils.getRddBlockLocations(rddId, storageStatusList)
+ val blocks = storageStatusList
+ .flatMap(_.rddBlocksById(rddId))
+ .sortWith(_._1.name < _._1.name)
+ .map { case (blockId, status) =>
+ (blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
+ }
val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks)
val content =
@@ -119,10 +120,10 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
| {status.blockManagerId.host + ":" + status.blockManagerId.port} |
- {Utils.bytesToString(status.memUsedByRDD(rddId))}
+ {Utils.bytesToString(status.memUsedByRdd(rddId))}
({Utils.bytesToString(status.memRemaining)} Remaining)
|
- {Utils.bytesToString(status.diskUsedByRDD(rddId))} |
+ {Utils.bytesToString(status.diskUsedByRdd(rddId))} |
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 0cc0cf311717..5f6740d49552 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -41,19 +41,18 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
- private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
+ private[ui] val _rddInfoMap = mutable.Map[Int, RDDInfo]() // exposed for testing
def storageStatusList = storageStatusListener.storageStatusList
/** Filter RDD info to include only those with cached partitions */
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
- /** Update each RDD's info to reflect any updates to the RDD's storage status */
- private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
- val rddInfos = _rddInfoMap.values.toSeq
- val updatedRddInfos =
- StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
- updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
+ /** Update the storage info of the RDDs whose blocks are among the given updated blocks */
+ private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
+ val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
+ val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
+ StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index fb18c3ebfe46..e6ab538d77bc 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import org.scalatest.{Assertions, FunSuite}
+import org.apache.spark.storage.StorageLevel
class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
test("getPersistentRDDs only returns RDDs that are marked as cached") {
@@ -35,26 +36,33 @@ class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
test("getPersistentRDDs returns an immutable map") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
val myRdds = sc.getPersistentRDDs
assert(myRdds.size === 1)
- assert(myRdds.values.head === rdd1)
+ assert(myRdds(0) === rdd1)
+ assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
+ // myRdds2 should have 2 RDDs, but myRdds should not change
val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
-
- // getPersistentRDDs should have 2 RDDs, but myRdds should not change
- assert(sc.getPersistentRDDs.size === 2)
+ val myRdds2 = sc.getPersistentRDDs
+ assert(myRdds2.size === 2)
+ assert(myRdds2(0) === rdd1)
+ assert(myRdds2(1) === rdd2)
+ assert(myRdds2(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
+ assert(myRdds2(1).getStorageLevel === StorageLevel.MEMORY_ONLY)
assert(myRdds.size === 1)
+ assert(myRdds(0) === rdd1)
+ assert(myRdds(0).getStorageLevel === StorageLevel.MEMORY_ONLY)
}
test("getRDDStorageInfo only reports on RDDs that actually persist data") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
assert(sc.getRDDStorageInfo.size === 0)
-
rdd.collect()
assert(sc.getRDDStorageInfo.size === 1)
+ assert(sc.getRDDStorageInfo.head.isCached)
+ assert(sc.getRDDStorageInfo.head.memSize > 0)
+ assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
}
test("call sites report correct locations") {
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 2179c6dd3302..51fb646a3cb6 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite {
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
@@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite {
val taskMetrics = new TaskMetrics
// Task end with no updated blocks
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
test("task end with updated blocks") {
@@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))
// Task end with new blocks
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
@@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
- assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
+ assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
- assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
- assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
- assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
+ assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
test("unpersist RDD") {
@@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite {
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
// Unpersist RDD
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
- assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
- assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
+ assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
- assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
+ assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
}
}
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
new file mode 100644
index 000000000000..38678bbd1dd2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import org.scalatest.FunSuite
+
+/**
+ * Test various functionalities in StorageUtils and StorageStatus.
+ */
+class StorageSuite extends FunSuite {
+ private val memAndDisk = StorageLevel.MEMORY_AND_DISK
+
+ // For testing add, update, and remove (for non-RDD blocks)
+ private def storageStatus1: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ assert(status.blocks.isEmpty)
+ assert(status.rddBlocks.isEmpty)
+ assert(status.memUsed === 0L)
+ assert(status.memRemaining === 1000L)
+ assert(status.diskUsed === 0L)
+ assert(status.offHeapUsed === 0L)
+ status.addBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(TestBlockId("faa"), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status
+ }
+
+ test("storage status add non-RDD blocks") {
+ val status = storageStatus1
+ assert(status.blocks.size === 3)
+ assert(status.blocks.contains(TestBlockId("foo")))
+ assert(status.blocks.contains(TestBlockId("fee")))
+ assert(status.blocks.contains(TestBlockId("faa")))
+ assert(status.rddBlocks.isEmpty)
+ assert(status.memUsed === 30L)
+ assert(status.memRemaining === 970L)
+ assert(status.diskUsed === 60L)
+ assert(status.offHeapUsed === 3L)
+ }
+
+ test("storage status update non-RDD blocks") {
+ val status = storageStatus1
+ status.updateBlock(TestBlockId("foo"), BlockStatus(memAndDisk, 50L, 100L, 1L))
+ status.updateBlock(TestBlockId("fee"), BlockStatus(memAndDisk, 100L, 20L, 0L))
+ assert(status.blocks.size === 3)
+ assert(status.memUsed === 160L)
+ assert(status.memRemaining === 840L)
+ assert(status.diskUsed === 140L)
+ assert(status.offHeapUsed === 2L)
+ }
+
+ test("storage status remove non-RDD blocks") {
+ val status = storageStatus1
+ status.removeBlock(TestBlockId("foo"))
+ status.removeBlock(TestBlockId("faa"))
+ assert(status.blocks.size === 1)
+ assert(status.blocks.contains(TestBlockId("fee")))
+ assert(status.memUsed === 10L)
+ assert(status.memRemaining === 990L)
+ assert(status.diskUsed === 20L)
+ assert(status.offHeapUsed === 1L)
+ }
+
+ // For testing add, update, remove, get, and contains etc. for both RDD and non-RDD blocks
+ private def storageStatus2: StorageStatus = {
+ val status = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ assert(status.rddBlocks.isEmpty)
+ status.addBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(TestBlockId("man"), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 100L, 200L, 1L))
+ status.addBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 10L, 20L, 1L))
+ status.addBlock(RDDBlockId(2, 3), BlockStatus(memAndDisk, 10L, 20L, 0L))
+ status.addBlock(RDDBlockId(2, 4), BlockStatus(memAndDisk, 10L, 40L, 0L))
+ status
+ }
+
+ test("storage status add RDD blocks") {
+ val status = storageStatus2
+ assert(status.blocks.size === 7)
+ assert(status.rddBlocks.size === 5)
+ assert(status.rddBlocks.contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocks.contains(RDDBlockId(1, 1)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 2)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 4)))
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocksById(1).size === 1)
+ assert(status.rddBlocksById(1).contains(RDDBlockId(1, 1)))
+ assert(status.rddBlocksById(2).size === 3)
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 2)))
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 4)))
+ assert(status.memUsedByRdd(0) === 10L)
+ assert(status.memUsedByRdd(1) === 100L)
+ assert(status.memUsedByRdd(2) === 30L)
+ assert(status.diskUsedByRdd(0) === 20L)
+ assert(status.diskUsedByRdd(1) === 200L)
+ assert(status.diskUsedByRdd(2) === 80L)
+ assert(status.offHeapUsedByRdd(0) === 1L)
+ assert(status.offHeapUsedByRdd(1) === 1L)
+ assert(status.offHeapUsedByRdd(2) === 1L)
+ assert(status.rddStorageLevel(0) === Some(memAndDisk))
+ assert(status.rddStorageLevel(1) === Some(memAndDisk))
+ assert(status.rddStorageLevel(2) === Some(memAndDisk))
+
+ // Verify default values for RDDs that don't exist
+ assert(status.rddBlocksById(10).isEmpty)
+ assert(status.memUsedByRdd(10) === 0L)
+ assert(status.diskUsedByRdd(10) === 0L)
+ assert(status.offHeapUsedByRdd(10) === 0L)
+ assert(status.rddStorageLevel(10) === None)
+ }
+
+ test("storage status update RDD blocks") {
+ val status = storageStatus2
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 5000L, 0L, 0L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L))
+ status.updateBlock(RDDBlockId(2, 2), BlockStatus(memAndDisk, 0L, 1000L, 0L))
+ assert(status.blocks.size === 7)
+ assert(status.rddBlocks.size === 5)
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(1).size === 1)
+ assert(status.rddBlocksById(2).size === 3)
+ assert(status.memUsedByRdd(0) === 0L)
+ assert(status.memUsedByRdd(1) === 100L)
+ assert(status.memUsedByRdd(2) === 20L)
+ assert(status.diskUsedByRdd(0) === 0L)
+ assert(status.diskUsedByRdd(1) === 200L)
+ assert(status.diskUsedByRdd(2) === 1060L)
+ assert(status.offHeapUsedByRdd(0) === 0L)
+ assert(status.offHeapUsedByRdd(1) === 1L)
+ assert(status.offHeapUsedByRdd(2) === 0L)
+ }
+
+ test("storage status remove RDD blocks") {
+ val status = storageStatus2
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(1, 1))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 4))
+ assert(status.blocks.size === 3)
+ assert(status.rddBlocks.size === 2)
+ assert(status.rddBlocks.contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocks.contains(RDDBlockId(2, 3)))
+ assert(status.rddBlocksById(0).size === 1)
+ assert(status.rddBlocksById(0).contains(RDDBlockId(0, 0)))
+ assert(status.rddBlocksById(1).size === 0)
+ assert(status.rddBlocksById(2).size === 1)
+ assert(status.rddBlocksById(2).contains(RDDBlockId(2, 3)))
+ assert(status.memUsedByRdd(0) === 10L)
+ assert(status.memUsedByRdd(1) === 0L)
+ assert(status.memUsedByRdd(2) === 10L)
+ assert(status.diskUsedByRdd(0) === 20L)
+ assert(status.diskUsedByRdd(1) === 0L)
+ assert(status.diskUsedByRdd(2) === 20L)
+ assert(status.offHeapUsedByRdd(0) === 1L)
+ assert(status.offHeapUsedByRdd(1) === 0L)
+ assert(status.offHeapUsedByRdd(2) === 0L)
+ }
+
+ test("storage status containsBlock") {
+ val status = storageStatus2
+ // blocks that actually exist
+ assert(status.blocks.contains(TestBlockId("dan")) === status.containsBlock(TestBlockId("dan")))
+ assert(status.blocks.contains(TestBlockId("man")) === status.containsBlock(TestBlockId("man")))
+ assert(status.blocks.contains(RDDBlockId(0, 0)) === status.containsBlock(RDDBlockId(0, 0)))
+ assert(status.blocks.contains(RDDBlockId(1, 1)) === status.containsBlock(RDDBlockId(1, 1)))
+ assert(status.blocks.contains(RDDBlockId(2, 2)) === status.containsBlock(RDDBlockId(2, 2)))
+ assert(status.blocks.contains(RDDBlockId(2, 3)) === status.containsBlock(RDDBlockId(2, 3)))
+ assert(status.blocks.contains(RDDBlockId(2, 4)) === status.containsBlock(RDDBlockId(2, 4)))
+ // blocks that don't exist
+ assert(status.blocks.contains(TestBlockId("fan")) === status.containsBlock(TestBlockId("fan")))
+ assert(status.blocks.contains(RDDBlockId(100, 0)) === status.containsBlock(RDDBlockId(100, 0)))
+ }
+
+ test("storage status getBlock") {
+ val status = storageStatus2
+ // blocks that actually exist
+ assert(status.blocks.get(TestBlockId("dan")) === status.getBlock(TestBlockId("dan")))
+ assert(status.blocks.get(TestBlockId("man")) === status.getBlock(TestBlockId("man")))
+ assert(status.blocks.get(RDDBlockId(0, 0)) === status.getBlock(RDDBlockId(0, 0)))
+ assert(status.blocks.get(RDDBlockId(1, 1)) === status.getBlock(RDDBlockId(1, 1)))
+ assert(status.blocks.get(RDDBlockId(2, 2)) === status.getBlock(RDDBlockId(2, 2)))
+ assert(status.blocks.get(RDDBlockId(2, 3)) === status.getBlock(RDDBlockId(2, 3)))
+ assert(status.blocks.get(RDDBlockId(2, 4)) === status.getBlock(RDDBlockId(2, 4)))
+ // blocks that don't exist
+ assert(status.blocks.get(TestBlockId("fan")) === status.getBlock(TestBlockId("fan")))
+ assert(status.blocks.get(RDDBlockId(100, 0)) === status.getBlock(RDDBlockId(100, 0)))
+ }
+
+ test("storage status num[Rdd]Blocks") {
+ val status = storageStatus2
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ status.addBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.addBlock(RDDBlockId(4, 4), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.addBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ status.updateBlock(TestBlockId("Foo"), BlockStatus(memAndDisk, 0L, 10L, 400L))
+ status.updateBlock(RDDBlockId(4, 0), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.updateBlock(RDDBlockId(4, 8), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ status.updateBlock(RDDBlockId(10, 10), BlockStatus(memAndDisk, 0L, 0L, 100L))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ assert(status.rddBlocksById(100).size === status.numRddBlocksById(100))
+ status.removeBlock(RDDBlockId(4, 0))
+ status.removeBlock(RDDBlockId(10, 10))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ // remove a block that doesn't exist
+ status.removeBlock(RDDBlockId(1000, 999))
+ assert(status.blocks.size === status.numBlocks)
+ assert(status.rddBlocks.size === status.numRddBlocks)
+ assert(status.rddBlocksById(4).size === status.numRddBlocksById(4))
+ assert(status.rddBlocksById(10).size === status.numRddBlocksById(10))
+ assert(status.rddBlocksById(1000).size === status.numRddBlocksById(1000))
+ }
+
+ test("storage status memUsed, diskUsed, tachyonUsed") {
+ val status = storageStatus2
+ def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
+ def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
+ def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.addBlock(TestBlockId("fire"), BlockStatus(memAndDisk, 4000L, 5000L, 6000L))
+ status.addBlock(TestBlockId("wire"), BlockStatus(memAndDisk, 400L, 500L, 600L))
+ status.addBlock(RDDBlockId(25, 25), BlockStatus(memAndDisk, 40L, 50L, 60L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.updateBlock(TestBlockId("dan"), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ status.updateBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ status.updateBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 4L, 5L, 6L))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ status.removeBlock(TestBlockId("fire"))
+ status.removeBlock(TestBlockId("man"))
+ status.removeBlock(RDDBlockId(2, 2))
+ status.removeBlock(RDDBlockId(2, 3))
+ assert(status.memUsed === actualMemUsed)
+ assert(status.diskUsed === actualDiskUsed)
+ assert(status.offHeapUsed === actualOffHeapUsed)
+ }
+
+ // For testing StorageUtils.updateRddInfo and StorageUtils.getRddBlockLocations
+ private def stockStorageStatuses: Seq[StorageStatus] = {
+ val status1 = new StorageStatus(BlockManagerId("big", "dog", 1, 1), 1000L)
+ val status2 = new StorageStatus(BlockManagerId("fat", "duck", 2, 2), 2000L)
+ val status3 = new StorageStatus(BlockManagerId("fat", "cat", 3, 3), 3000L)
+ status1.addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status1.addBlock(RDDBlockId(0, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(0, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(0, 3), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status2.addBlock(RDDBlockId(1, 1), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status3.addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ status3.addBlock(RDDBlockId(1, 2), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ Seq(status1, status2, status3)
+ }
+
+ // For testing StorageUtils.updateRddInfo
+ private def stockRDDInfos: Seq[RDDInfo] = {
+ val info0 = new RDDInfo(0, "0", 10, memAndDisk)
+ val info1 = new RDDInfo(1, "1", 3, memAndDisk)
+ Seq(info0, info1)
+ }
+
+ test("StorageUtils.updateRddInfo") {
+ val storageStatuses = stockStorageStatuses
+ val rddInfos = stockRDDInfos
+ StorageUtils.updateRddInfo(rddInfos, storageStatuses)
+ assert(rddInfos(0).storageLevel === memAndDisk)
+ assert(rddInfos(0).numCachedPartitions === 5)
+ assert(rddInfos(0).memSize === 5L)
+ assert(rddInfos(0).diskSize === 10L)
+ assert(rddInfos(0).tachyonSize === 0L)
+ assert(rddInfos(1).storageLevel === memAndDisk)
+ assert(rddInfos(1).numCachedPartitions === 3)
+ assert(rddInfos(1).memSize === 3L)
+ assert(rddInfos(1).diskSize === 6L)
+ assert(rddInfos(1).tachyonSize === 0L)
+ }
+
+ test("StorageUtils.getRddBlockLocations") {
+ val storageStatuses = stockStorageStatuses
+ val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
+ val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
+ assert(blockLocations0.size === 5)
+ assert(blockLocations1.size === 3)
+ assert(blockLocations0.contains(RDDBlockId(0, 0)))
+ assert(blockLocations0.contains(RDDBlockId(0, 1)))
+ assert(blockLocations0.contains(RDDBlockId(0, 2)))
+ assert(blockLocations0.contains(RDDBlockId(0, 3)))
+ assert(blockLocations0.contains(RDDBlockId(0, 4)))
+ assert(blockLocations1.contains(RDDBlockId(1, 0)))
+ assert(blockLocations1.contains(RDDBlockId(1, 1)))
+ assert(blockLocations1.contains(RDDBlockId(1, 2)))
+ assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 4)) === Seq("cat:3"))
+ assert(blockLocations1(RDDBlockId(1, 0)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
+ }
+
+ test("StorageUtils.getRddBlockLocations with multiple locations") {
+ val storageStatuses = stockStorageStatuses
+ storageStatuses(0).addBlock(RDDBlockId(1, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ storageStatuses(0).addBlock(RDDBlockId(0, 4), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ storageStatuses(2).addBlock(RDDBlockId(0, 0), BlockStatus(memAndDisk, 1L, 2L, 0L))
+ val blockLocations0 = StorageUtils.getRddBlockLocations(0, storageStatuses)
+ val blockLocations1 = StorageUtils.getRddBlockLocations(1, storageStatuses)
+ assert(blockLocations0.size === 5)
+ assert(blockLocations1.size === 3)
+ assert(blockLocations0(RDDBlockId(0, 0)) === Seq("dog:1", "cat:3"))
+ assert(blockLocations0(RDDBlockId(0, 1)) === Seq("dog:1"))
+ assert(blockLocations0(RDDBlockId(0, 2)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 3)) === Seq("duck:2"))
+ assert(blockLocations0(RDDBlockId(0, 4)) === Seq("dog:1", "cat:3"))
+ assert(blockLocations1(RDDBlockId(1, 0)) === Seq("dog:1", "duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 1)) === Seq("duck:2"))
+ assert(blockLocations1(RDDBlockId(1, 2)) === Seq("cat:3"))
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
new file mode 100644
index 000000000000..6e68dcb3425a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.storage
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.apache.spark.Success
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.storage._
+
+/**
+ * Test various functionality in the StorageListener that supports the StorageTab.
+ */
+class StorageTabSuite extends FunSuite with BeforeAndAfter {
+ private var bus: LiveListenerBus = _
+ private var storageStatusListener: StorageStatusListener = _
+ private var storageListener: StorageListener = _
+ private val memAndDisk = StorageLevel.MEMORY_AND_DISK
+ private val memOnly = StorageLevel.MEMORY_ONLY
+ private val none = StorageLevel.NONE
+ private val taskInfo = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
+ private def rddInfo0 = new RDDInfo(0, "freedom", 100, memOnly)
+ private def rddInfo1 = new RDDInfo(1, "hostage", 200, memOnly)
+ private def rddInfo2 = new RDDInfo(2, "sanity", 300, memAndDisk)
+ private def rddInfo3 = new RDDInfo(3, "grace", 400, memAndDisk)
+ private val bm1 = BlockManagerId("big", "dog", 1, 1)
+
+ before {
+ bus = new LiveListenerBus
+ storageStatusListener = new StorageStatusListener
+ storageListener = new StorageListener(storageStatusListener)
+ bus.addListener(storageStatusListener)
+ bus.addListener(storageListener)
+ }
+
+ test("stage submitted / completed") {
+ assert(storageListener._rddInfoMap.isEmpty)
+ assert(storageListener.rddInfoList.isEmpty)
+
+ // 2 RDDs are known, but none are cached
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.isEmpty)
+
+ // 4 RDDs are known, but only 2 are cached
+ val rddInfo2Cached = rddInfo2
+ val rddInfo3Cached = rddInfo3
+ rddInfo2Cached.numCachedPartitions = 1
+ rddInfo3Cached.numCachedPartitions = 1
+ val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+ assert(storageListener._rddInfoMap.size === 4)
+ assert(storageListener.rddInfoList.size === 2)
+
+ // Submitting RDDInfos with duplicate IDs does nothing
+ val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY)
+ rddInfo0Cached.numCachedPartitions = 1
+ val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
+ assert(storageListener._rddInfoMap.size === 4)
+ assert(storageListener.rddInfoList.size === 2)
+
+ // We only keep around the RDDs that are cached
+ bus.postToAll(SparkListenerStageCompleted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.size === 2)
+ }
+
+ test("unpersist") {
+ val rddInfo0Cached = rddInfo0
+ val rddInfo1Cached = rddInfo1
+ rddInfo0Cached.numCachedPartitions = 1
+ rddInfo1Cached.numCachedPartitions = 1
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 2)
+ assert(storageListener.rddInfoList.size === 2)
+ bus.postToAll(SparkListenerUnpersistRDD(0))
+ assert(storageListener._rddInfoMap.size === 1)
+ assert(storageListener.rddInfoList.size === 1)
+ bus.postToAll(SparkListenerUnpersistRDD(4)) // doesn't exist
+ assert(storageListener._rddInfoMap.size === 1)
+ assert(storageListener.rddInfoList.size === 1)
+ bus.postToAll(SparkListenerUnpersistRDD(1))
+ assert(storageListener._rddInfoMap.size === 0)
+ assert(storageListener.rddInfoList.size === 0)
+ }
+
+ test("task end") {
+ val myRddInfo0 = rddInfo0
+ val myRddInfo1 = rddInfo1
+ val myRddInfo2 = rddInfo2
+ val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+ bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
+ assert(storageListener._rddInfoMap.size === 3)
+ assert(storageListener.rddInfoList.size === 0) // not cached
+ assert(!storageListener._rddInfoMap(0).isCached)
+ assert(!storageListener._rddInfoMap(1).isCached)
+ assert(!storageListener._rddInfoMap(2).isCached)
+
+ // Task end with no updated blocks. This should not change anything.
+ bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics))
+ assert(storageListener._rddInfoMap.size === 3)
+ assert(storageListener.rddInfoList.size === 0)
+
+ // Task end with a few new persisted blocks, some from the same RDD
+ val metrics1 = new TaskMetrics
+ metrics1.updatedBlocks = Some(Seq(
+ (RDDBlockId(0, 100), BlockStatus(memAndDisk, 400L, 0L, 0L)),
+ (RDDBlockId(0, 101), BlockStatus(memAndDisk, 0L, 400L, 0L)),
+ (RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
+ (RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
+ ))
+ bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1))
+ assert(storageListener._rddInfoMap(0).memSize === 800L)
+ assert(storageListener._rddInfoMap(0).diskSize === 400L)
+ assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 3)
+ assert(storageListener._rddInfoMap(0).isCached)
+ assert(storageListener._rddInfoMap(1).memSize === 0L)
+ assert(storageListener._rddInfoMap(1).diskSize === 240L)
+ assert(storageListener._rddInfoMap(1).tachyonSize === 0L)
+ assert(storageListener._rddInfoMap(1).numCachedPartitions === 1)
+ assert(storageListener._rddInfoMap(1).isCached)
+ assert(!storageListener._rddInfoMap(2).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+
+ // Task end with a few dropped blocks
+ val metrics2 = new TaskMetrics
+ metrics2.updatedBlocks = Some(Seq(
+ (RDDBlockId(0, 100), BlockStatus(none, 0L, 0L, 0L)),
+ (RDDBlockId(1, 20), BlockStatus(none, 0L, 0L, 0L)),
+ (RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
+ (RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
+ ))
+ bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2))
+ assert(storageListener._rddInfoMap(0).memSize === 400L)
+ assert(storageListener._rddInfoMap(0).diskSize === 400L)
+ assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
+ assert(storageListener._rddInfoMap(0).numCachedPartitions === 2)
+ assert(storageListener._rddInfoMap(0).isCached)
+ assert(!storageListener._rddInfoMap(1).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+ assert(!storageListener._rddInfoMap(2).isCached)
+ assert(storageListener._rddInfoMap(2).numCachedPartitions === 0)
+ }
+
+}