@@ -20,7 +20,6 @@ package org.apache.spark.storage
2020import scala .collection .Map
2121import scala .collection .mutable
2222
23- import org .apache .spark .SparkException
2423import org .apache .spark .annotation .DeveloperApi
2524
2625/**
@@ -48,16 +47,20 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
4847 private val _nonRddBlocks = new mutable.HashMap [BlockId , BlockStatus ]
4948
5049 /**
51- * A map of storage information associated with each RDD .
50+ * Storage information of the blocks that entails memory, disk, and off-heap memory usage .
5251 *
53- * The key is the ID of the RDD, and the value is a 4-tuple of the following:
54- * (size in memory, size on disk, size in tachyon, storage level)
52+ * As with the block maps, we store the storage information separately for RDD blocks and
53+ * non-RDD blocks for the same reason. In particular, RDD storage information is stored
54+ * in a map indexed by the RDD ID to the following 4-tuple:
5555 *
56- * This is updated incrementally on each block added, updated or removed, so as to avoid
57- * linearly scanning through all the blocks within an RDD if we're only interested in a
58- * given RDD's storage information.
56+ * (memory size, disk size, off-heap size, storage level)
57+ *
58+ * We assume that all the blocks that belong to the same RDD have the same storage level.
59+ * This field is not relevant to non-RDD blocks, however, so the storage information for
60+ * non-RDD blocks contains only the first 3 fields (in the same order).
5961 */
6062 private val _rddStorageInfo = new mutable.HashMap [Int , (Long , Long , Long , StorageLevel )]
63+ private var _nonRddStorageInfo : (Long , Long , Long ) = (0L , 0L , 0L )
6164
6265 /**
6366 * Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of
@@ -93,16 +96,9 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
9396
9497 /** Add the given block to this storage status. If it already exists, overwrite it. */
9598 def addBlock (blockId : BlockId , blockStatus : BlockStatus ): Unit = {
99+ updateStorageInfo(blockId, blockStatus)
96100 blockId match {
97101 case RDDBlockId (rddId, _) =>
98- // Update the storage info of the RDD, keeping track of any existing status for this block
99- val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus .empty)
100- val changeInMem = blockStatus.memSize - oldBlockStatus.memSize
101- val changeInDisk = blockStatus.diskSize - oldBlockStatus.diskSize
102- val changeInTachyon = blockStatus.tachyonSize - oldBlockStatus.tachyonSize
103- val level = blockStatus.storageLevel
104- updateRddStorageInfo(rddId, changeInMem, changeInDisk, changeInTachyon, level)
105- // Actually add the block itself
106102 _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap )(blockId) = blockStatus
107103 case _ =>
108104 _nonRddBlocks(blockId) = blockStatus
@@ -116,12 +112,9 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
116112
117113 /** Remove the given block from this storage status. */
118114 def removeBlock (blockId : BlockId ): Option [BlockStatus ] = {
115+ updateStorageInfo(blockId, BlockStatus .empty)
119116 blockId match {
120117 case RDDBlockId (rddId, _) =>
121- // Update the storage info of the RDD if the block to remove exists
122- getBlock(blockId).foreach { s =>
123- updateRddStorageInfo(rddId, - s.memSize, - s.diskSize, - s.tachyonSize, StorageLevel .NONE )
124- }
125118 // Actually remove the block, if it exists
126119 if (_rddBlocks.contains(rddId)) {
127120 val removed = _rddBlocks(rddId).remove(blockId)
@@ -145,7 +138,7 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
145138 def containsBlock (blockId : BlockId ): Boolean = {
146139 blockId match {
147140 case RDDBlockId (rddId, _) =>
148- _rddBlocks.get(rddId).filter (_.contains(blockId)).isDefined
141+ _rddBlocks.get(rddId).exists (_.contains(blockId))
149142 case _ =>
150143 _nonRddBlocks.contains(blockId)
151144 }
@@ -174,7 +167,7 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
174167 * Return the number of RDD blocks stored in this block manager in O(RDDs) time.
175168 * Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
176169 */
177- def numRddBlocks : Int = _rddBlocks.values.map(_.size).reduceOption( _ + _).getOrElse( 0 )
170+ def numRddBlocks : Int = _rddBlocks.values.map(_.size).fold( 0 )( _ + _)
178171
179172 /**
180173 * Return the number of blocks that belong to the given RDD in O(1) time.
@@ -183,17 +176,20 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
183176 */
184177 def numRddBlocksById (rddId : Int ): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0 )
185178
186- /** Return the memory used by this block manager. */
187- def memUsed : Long = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L )
188-
189179 /** Return the memory remaining in this block manager. */
190180 def memRemaining : Long = maxMem - memUsed
191181
182+ /** Return the memory used by this block manager. */
183+ def memUsed : Long =
184+ _nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).fold(0L )(_ + _)
185+
192186 /** Return the disk space used by this block manager. */
193- def diskUsed : Long = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L )
187+ def diskUsed : Long =
188+ _nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).fold(0L )(_ + _)
194189
195190 /** Return the off-heap space used by this block manager. */
196- def offHeapUsed : Long = blocks.values.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L )
191+ def offHeapUsed : Long =
192+ _nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).fold(0L )(_ + _)
197193
198194 /** Return the memory used by the given RDD in this block manager in O(1) time. */
199195 def memUsedByRdd (rddId : Int ): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L )
@@ -208,34 +204,50 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
208204 def rddStorageLevel (rddId : Int ): Option [StorageLevel ] = _rddStorageInfo.get(rddId).map(_._4)
209205
210206 /**
211- * Helper function to update the given RDD's storage information based on the (possibly
212- * negative) changes in memory, disk, and off-heap memory usages. This is exposed for testing.
207+ * Update the relevant storage info, taking into account any existing status for this block.
208+ * This is exposed for testing.
213209 */
214- private [spark] def updateRddStorageInfo (
215- rddId : Int ,
216- changeInMem : Long ,
217- changeInDisk : Long ,
218- changeInTachyon : Long ,
219- storageLevel : StorageLevel ): Unit = {
220- val emptyRddInfo = (0L , 0L , 0L , StorageLevel .NONE )
221- val oldRddInfo = _rddStorageInfo.getOrElse(rddId, emptyRddInfo)
222- val newRddInfo = oldRddInfo match {
223- case (oldRddMem, oldRddDisk, oldRddTachyon, _) =>
224- val newRddMem = math.max(oldRddMem + changeInMem, 0L )
225- val newRddDisk = math.max(oldRddDisk + changeInDisk, 0L )
226- val newRddTachyon = math.max(oldRddTachyon + changeInTachyon, 0L )
227- (newRddMem, newRddDisk, newRddTachyon, storageLevel)
210+ private [spark] def updateStorageInfo (blockId : BlockId , newBlockStatus : BlockStatus ): Unit = {
211+ val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus .empty)
212+ val changeInMem = newBlockStatus.memSize - oldBlockStatus.memSize
213+ val changeInDisk = newBlockStatus.diskSize - oldBlockStatus.diskSize
214+ val changeInTachyon = newBlockStatus.tachyonSize - oldBlockStatus.tachyonSize
215+ val level = newBlockStatus.storageLevel
216+
217+ // Compute new info from old info
218+ val oldInfo : (Long , Long , Long ) = blockId match {
219+ case RDDBlockId (rddId, _) =>
220+ _rddStorageInfo.get(rddId)
221+ .map { case (mem, disk, tachyon, _) => (mem, disk, tachyon) }
222+ .getOrElse((0L , 0L , 0L ))
228223 case _ =>
229- // Should never happen
230- throw new SparkException (s " Existing information for $rddId is not of expected type " )
224+ _nonRddStorageInfo
231225 }
232- // If this RDD is no longer persisted, remove it
233- if (newRddInfo._1 + newRddInfo._2 + newRddInfo._3 == 0 ) {
234- _rddStorageInfo.remove(rddId)
235- } else {
236- _rddStorageInfo(rddId) = newRddInfo
226+ val newInfo : (Long , Long , Long ) = oldInfo match {
227+ case (oldMem, oldDisk, oldTachyon) =>
228+ val newMem = math.max(oldMem + changeInMem, 0L )
229+ val newDisk = math.max(oldDisk + changeInDisk, 0L )
230+ val newTachyon = math.max(oldTachyon + changeInTachyon, 0L )
231+ (newMem, newDisk, newTachyon)
232+ }
233+
234+ // Set the correct info
235+ blockId match {
236+ case RDDBlockId (rddId, _) =>
237+ newInfo match {
238+ case (mem, disk, tachyon) =>
239+ // If this RDD is no longer persisted, remove it
240+ if (mem + disk + tachyon == 0 ) {
241+ _rddStorageInfo.remove(rddId)
242+ } else {
243+ _rddStorageInfo(rddId) = (mem, disk, tachyon, level)
244+ }
245+ }
246+ case _ =>
247+ _nonRddStorageInfo = newInfo
237248 }
238249 }
250+
239251}
240252
241253/** Helper methods for storage-related objects. */
@@ -251,11 +263,10 @@ private[spark] object StorageUtils {
251263 // Assume all blocks belonging to the same RDD have the same storage level
252264 val storageLevel = statuses
253265 .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel .NONE )
254- val numCachedPartitions = statuses
255- .map(_.numRddBlocksById(rddId)).reduceOption(_ + _).getOrElse(0 )
256- val memSize = statuses.map(_.memUsedByRdd(rddId)).reduceOption(_ + _).getOrElse(0L )
257- val diskSize = statuses.map(_.diskUsedByRdd(rddId)).reduceOption(_ + _).getOrElse(0L )
258- val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).reduceOption(_ + _).getOrElse(0L )
266+ val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).fold(0 )(_ + _)
267+ val memSize = statuses.map(_.memUsedByRdd(rddId)).fold(0L )(_ + _)
268+ val diskSize = statuses.map(_.diskUsedByRdd(rddId)).fold(0L )(_ + _)
269+ val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).fold(0L )(_ + _)
259270
260271 rddInfo.storageLevel = storageLevel
261272 rddInfo.numCachedPartitions = numCachedPartitions
0 commit comments