@@ -20,35 +20,45 @@ package org.apache.spark.storage
2020import scala .collection .Map
2121import scala .collection .mutable
2222
23+ import org .apache .spark .SparkException
2324import org .apache .spark .annotation .DeveloperApi
2425
2526/**
2627 * :: DeveloperApi ::
27- * Storage information for each BlockManager. This class assumes BlockId and BlockStatus are
28- * immutable, such that the consumers of this class will not mutate the source of the information.
28+ * Storage information for each BlockManager.
29+ *
30+ * This class assumes BlockId and BlockStatus are immutable, such that the consumers of this
31+ * class cannot mutate the source of the information. Accesses are not thread-safe.
2932 */
3033@ DeveloperApi
3134class StorageStatus (val blockManagerId : BlockManagerId , val maxMem : Long ) {
3235
3336 /**
3437 * Internal representation of the blocks stored in this block manager.
3538 *
36- * Common consumption patterns of these blocks include
37- * (1) selecting all blocks,
38- * (2) selecting only RDD blocks or,
39- * (3) selecting only the blocks that belong to a specific RDD
40- *
41- * If we are only interested in a fraction of the blocks, as in (2) and (3), we should avoid
42- * linearly scanning through all the blocks, which could be expensive if there are thousands
43- * of blocks on each block manager. We achieve this by storing RDD blocks and non-RDD blocks
44- * separately. In particular, RDD blocks are stored in a map indexed by RDD IDs, so we can
45- * filter out the blocks of interest quickly.
46- *
39+ * A common consumption pattern is to access only the blocks that belong to a specific RDD.
40+ * For this use case, we should avoid linearly scanning through all the blocks, which could
41+ * be expensive if there are thousands of blocks on each block manager. Thus, we need to store
42+ * RDD blocks and non-RDD blocks separately. In particular, we store RDD blocks in a map
43+ * indexed by RDD IDs, so we can filter out the blocks of interest quickly.
44+
4745 * These collections should only be mutated through the add/update/removeBlock methods.
4846 */
4947 private val _rddBlocks = new mutable.HashMap [Int , mutable.Map [BlockId , BlockStatus ]]
5048 private val _nonRddBlocks = new mutable.HashMap [BlockId , BlockStatus ]
5149
50+ /**
51+ * A map of storage information associated with each RDD.
52+ *
53+ * The key is the ID of the RDD, and the value is a 4-tuple of the following:
54+ * (size in memory, size on disk, size in tachyon, storage level)
55+ *
56+ * This is updated incrementally on each block added, updated or removed, so as to avoid
57+ * linearly scanning through all the blocks within an RDD if we're only interested in a
58+ * given RDD's storage information.
59+ */
60+ private val _rddStorageInfo = new mutable.HashMap [Int , (Long , Long , Long , StorageLevel )]
61+
5262 /**
5363 * Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of
5464 * the original blocks map such that the fate of this storage status is not tied to the source.
@@ -79,6 +89,14 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
7989 def addBlock (blockId : BlockId , blockStatus : BlockStatus ): Unit = {
8090 blockId match {
8191 case RDDBlockId (rddId, _) =>
92+ // Update the storage info of the RDD, keeping track of any existing status for this block
93+ val oldBlockStatus = getBlock(blockId).getOrElse(BlockStatus .empty)
94+ val changeInMem = blockStatus.memSize - oldBlockStatus.memSize
95+ val changeInDisk = blockStatus.diskSize - oldBlockStatus.diskSize
96+ val changeInTachyon = blockStatus.tachyonSize - oldBlockStatus.tachyonSize
97+ val level = blockStatus.storageLevel
98+ updateRddStorageInfo(rddId, changeInMem, changeInDisk, changeInTachyon, level)
99+ // Actually add the block itself
82100 _rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap )(blockId) = blockStatus
83101 case _ =>
84102 _nonRddBlocks(blockId) = blockStatus
@@ -94,6 +112,11 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
94112 def removeBlock (blockId : BlockId ): Option [BlockStatus ] = {
95113 blockId match {
96114 case RDDBlockId (rddId, _) =>
115+ // Update the storage info of the RDD if the block to remove exists
116+ getBlock(blockId).foreach { s =>
117+ updateRddStorageInfo(rddId, - s.memSize, - s.diskSize, - s.tachyonSize, StorageLevel .NONE )
118+ }
119+ // Actually remove the block, if it exists
97120 if (_rddBlocks.contains(rddId)) {
98121 val removed = _rddBlocks(rddId).remove(blockId)
99122 // If the given RDD has no more blocks left, remove the RDD
@@ -136,33 +159,79 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
136159 }
137160
138161 /**
139- * Return the number of blocks stored in this block manager in O(rdds ) time.
162+ * Return the number of blocks stored in this block manager in O(RDDs ) time.
140163 * Note that this is much faster than `this.blocks.size`, which is O(blocks) time.
141164 */
142165 def numBlocks : Int = {
143166 _nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0 )
144167 }
145168
169+ /**
170+ * Return the number of RDD blocks stored in this block manager in O(RDDs) time.
171+ * Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
172+ */
173+ def numRddBlocks : Int = _rddBlocks.keys.map(numRddBlocksById).reduceOption(_ + _).getOrElse(0 )
174+
175+ /**
176+ * Return the number of blocks that belong to the given RDD in O(1) time.
177+ * Note that this is much faster than `this.rddBlocksById(rddId).size`, which is
178+ * O(blocks in this RDD) time.
179+ */
180+ def numRddBlocksById (rddId : Int ): Int = _rddBlocks.get(rddId).map(_.size).getOrElse(0 )
181+
146182 /** Return the memory used by this block manager. */
147- def memUsed : Long = memUsed( blocks)
183+ def memUsed : Long = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse( 0L )
148184
149185 /** Return the memory used by the given RDD in this block manager. */
150- def memUsedByRDD (rddId : Int ): Long = memUsed(rddBlocksById( rddId))
186+ def memUsedByRDD (rddId : Int ): Long = _rddStorageInfo.get( rddId).map(_._1).getOrElse( 0L )
151187
152188 /** Return the memory remaining in this block manager. */
153189 def memRemaining : Long = maxMem - memUsed
154190
155191 /** Return the disk space used by this block manager. */
156- def diskUsed : Long = diskUsed( blocks)
192+ def diskUsed : Long = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse( 0L )
157193
158194 /** Return the disk space used by the given RDD in this block manager. */
159- def diskUsedByRDD (rddId : Int ): Long = diskUsed(rddBlocksById(rddId))
195+ def diskUsedByRDD (rddId : Int ): Long = _rddStorageInfo.get(rddId).map(_._2).getOrElse(0L )
196+
197+ /** Return the off-heap space used by this block manager. */
198+ def offHeapUsed : Long = blocks.values.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L )
199+
200+ /** Return the off-heap space used by the given RDD in this block manager. */
201+ def offHeapUsedByRdd (rddId : Int ): Long = _rddStorageInfo.get(rddId).map(_._3).getOrElse(0L )
202+
203+ /** Return the storage level, if any, used by the given RDD in this block manager. */
204+ def rddStorageLevel (rddId : Int ): Option [StorageLevel ] = _rddStorageInfo.get(rddId).map(_._4)
160205
161- // Helper methods for computing memory and disk usages
162- private def memUsed (_blocks : Map [BlockId , BlockStatus ]): Long =
163- _blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L )
164- private def diskUsed (_blocks : Map [BlockId , BlockStatus ]): Long =
165- _blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L )
206+ /**
207+ * Helper function to update the given RDD's storage information based on the
208+ * (possibly negative) changes in memory, disk, and off-heap memory usages.
209+ */
210+ private def updateRddStorageInfo (
211+ rddId : Int ,
212+ changeInMem : Long ,
213+ changeInDisk : Long ,
214+ changeInTachyon : Long ,
215+ storageLevel : StorageLevel ): Unit = {
216+ val emptyRddInfo = (0L , 0L , 0L , StorageLevel .NONE )
217+ val oldRddInfo = _rddStorageInfo.getOrElse(rddId, emptyRddInfo)
218+ val newRddInfo = oldRddInfo match {
219+ case (oldRddMem, oldRddDisk, oldRddTachyon, _) =>
220+ val newRddMem = math.max(oldRddMem + changeInMem, 0L )
221+ val newRddDisk = math.max(oldRddDisk + changeInDisk, 0L )
222+ val newRddTachyon = math.max(oldRddTachyon + changeInTachyon, 0L )
223+ (newRddMem, newRddDisk, newRddTachyon, storageLevel)
224+ case _ =>
225+ // Should never happen
226+ throw new SparkException (s " Existing information for $rddId is not of expected type " )
227+ }
228+ // If this RDD is no longer persisted, remove it
229+ if (newRddInfo._1 + newRddInfo._2 + newRddInfo._3 == 0 ) {
230+ _rddStorageInfo.remove(rddId)
231+ } else {
232+ _rddStorageInfo(rddId) = newRddInfo
233+ }
234+ }
166235}
167236
168237/** Helper methods for storage-related objects. */
@@ -172,32 +241,20 @@ private[spark] object StorageUtils {
172241 * Update the given list of RDDInfo with the given list of storage statuses.
173242 * This method overwrites the old values stored in the RDDInfo's.
174243 */
175- def updateRddInfo (
176- rddInfos : Seq [RDDInfo ],
177- storageStatuses : Seq [StorageStatus ],
178- updatedBlocks : Seq [(BlockId , BlockStatus )] = Seq .empty): Unit = {
179-
244+ def updateRddInfo (rddInfos : Seq [RDDInfo ], statuses : Seq [StorageStatus ]): Unit = {
180245 rddInfos.foreach { rddInfo =>
181246 val rddId = rddInfo.id
182-
183- // Collect all block statuses that belong to the given RDD
184- val newBlocks = updatedBlocks.filter { case (bid, _) =>
185- bid.asRDDId.filter(_.rddId == rddId).isDefined
186- }
187- val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
188- val oldBlocks = storageStatuses
189- .flatMap(_.rddBlocksById(rddId))
190- .filter { case (bid, _) => ! newBlockIds.contains(bid) } // avoid double counting
191- val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
192- val persistedBlocks = blocks.filter(_.isCached)
193-
194247 // Assume all blocks belonging to the same RDD have the same storage level
195- val storageLevel = blocks.headOption.map(_.storageLevel).getOrElse(StorageLevel .NONE )
196- val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L )
197- val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L )
198- val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L )
248+ val storageLevel = statuses
249+ .map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel .NONE )
250+ val numCachedPartitions = statuses
251+ .map(_.numRddBlocksById(rddId)).reduceOption(_ + _).getOrElse(0 )
252+ val memSize = statuses.map(_.memUsedByRDD(rddId)).reduceOption(_ + _).getOrElse(0L )
253+ val diskSize = statuses.map(_.diskUsedByRDD(rddId)).reduceOption(_ + _).getOrElse(0L )
254+ val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).reduceOption(_ + _).getOrElse(0L )
255+
199256 rddInfo.storageLevel = storageLevel
200- rddInfo.numCachedPartitions = persistedBlocks.length
257+ rddInfo.numCachedPartitions = numCachedPartitions
201258 rddInfo.memSize = memSize
202259 rddInfo.diskSize = diskSize
203260 rddInfo.tachyonSize = tachyonSize
@@ -207,11 +264,9 @@ private[spark] object StorageUtils {
207264 /**
208265 * Return mapping from block ID to its locations for each block that belongs to the given RDD.
209266 */
210- def getRddBlockLocations (
211- storageStatuses : Seq [StorageStatus ],
212- rddId : Int ): Map [BlockId , Seq [String ]] = {
267+ def getRddBlockLocations (statuses : Seq [StorageStatus ], rddId : Int ): Map [BlockId , Seq [String ]] = {
213268 val blockLocations = new mutable.HashMap [BlockId , mutable.ListBuffer [String ]]
214- storageStatuses .foreach { status =>
269+ statuses .foreach { status =>
215270 status.rddBlocksById(rddId).foreach { case (bid, _) =>
216271 val location = status.blockManagerId.hostPort
217272 blockLocations.getOrElseUpdate(bid, mutable.ListBuffer .empty) += location
0 commit comments