Skip to content

Commit 53af15d

Browse files
committed
Refactor StorageStatus + add a bunch of tests
This commit refactors storage status to keep around a set of RDD IDs which have blocks stored in the status' block manager. The purpose is such that we don't have to linearly scan through every single storage status' blocks if it doesn't even contain blocks for the RDD we're interested in in the first place. This commit also adds a bunch of tests for StorageStatus and StorageUtils methods. There were previously a few minor bugs in StorageUtils.blockLocationsFromStorageStatus and StorageUtils.filterStorageStatusByRDD that are now fixed and tested. Going forward, we need to first cleanup the method signatures to reflect what they actually do. Then we will make things more efficient now that we've set the stage.
1 parent e3d85b7 commit 53af15d

File tree

7 files changed

+380
-53
lines changed

7 files changed

+380
-53
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
265265

266266
private def storageStatus: Array[StorageStatus] = {
267267
blockManagerInfo.map { case(blockManagerId, info) =>
268-
val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
269-
new StorageStatus(blockManagerId, info.maxMem, blockMap)
268+
val storageStatus = new StorageStatus(blockManagerId, info.maxMem)
269+
info.blocks.foreach { case (id, status) => storageStatus.addBlock(id, status) }
270+
storageStatus
270271
}.toArray
271272
}
272273

core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
5555
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
5656
override def getValue: Long = {
5757
val storageStatusList = blockManager.master.getStorageStatus
58-
val diskSpaceUsed = storageStatusList
59-
.flatMap(_.blocks.values.map(_.diskSize))
60-
.reduceOption(_ + _)
61-
.getOrElse(0L)
62-
58+
val diskSpaceUsed = storageStatusList.map(_.diskUsed).reduceOption(_ + _).getOrElse(0L)
6359
diskSpaceUsed / 1024 / 1024
6460
}
6561
})

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ class StorageStatusListener extends SparkListener {
3939
filteredStatus.foreach { storageStatus =>
4040
updatedBlocks.foreach { case (blockId, updatedStatus) =>
4141
if (updatedStatus.storageLevel == StorageLevel.NONE) {
42-
storageStatus.blocks.remove(blockId)
42+
storageStatus.removeBlock(blockId)
4343
} else {
44-
storageStatus.blocks(blockId) = updatedStatus
44+
storageStatus.updateBlock(blockId, updatedStatus)
4545
}
4646
}
4747
}
@@ -50,9 +50,8 @@ class StorageStatusListener extends SparkListener {
5050
/** Update storage status list to reflect the removal of an RDD from the cache */
5151
private def updateStorageStatus(unpersistedRDDId: Int) {
5252
storageStatusList.foreach { storageStatus =>
53-
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
54-
unpersistedBlocksIds.foreach { blockId =>
55-
storageStatus.blocks.remove(blockId)
53+
storageStatus.rddBlocks(unpersistedRDDId).foreach { case (blockId, _) =>
54+
storageStatus.removeBlock(blockId)
5655
}
5756
}
5857
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 78 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -28,55 +28,96 @@ import org.apache.spark.annotation.DeveloperApi
2828
* Storage information for each BlockManager.
2929
*/
3030
@DeveloperApi
31-
class StorageStatus(
32-
val blockManagerId: BlockManagerId,
33-
val maxMem: Long,
34-
val blocks: mutable.Map[BlockId, BlockStatus] = mutable.Map.empty) {
31+
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
32+
private val _blocks = new mutable.HashMap[BlockId, BlockStatus]
33+
private val _rddIds = new mutable.HashSet[Int]
34+
35+
/** Return the blocks stored in this block manager as a mapping from ID to status. */
36+
def blocks: Map[BlockId, BlockStatus] = _blocks
37+
38+
/** Add the given block, keeping track of the RDD ID if this is an RDD block. */
39+
def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
40+
blockId match {
41+
case RDDBlockId(rddId, _) => _rddIds.add(rddId)
42+
case _ =>
43+
}
44+
_blocks(blockId) = blockStatus
45+
}
3546

36-
def memUsed = blocks.values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
47+
/** Update the given block, keeping track of the RDD ID if this is an RDD block. */
48+
def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = addBlock(blockId, blockStatus)
49+
50+
/** Remove the given block, keeping track of the RDD ID if this is an RDD block. */
51+
def removeBlock(blockId: BlockId): Option[BlockStatus] = {
52+
val removed = _blocks.remove(blockId)
53+
blockId match {
54+
case RDDBlockId(rddId, _) =>
55+
if (rddBlocks(rddId).isEmpty) {
56+
_rddIds.remove(rddId)
57+
}
58+
case _ =>
59+
}
60+
removed
61+
}
3762

38-
def memUsedByRDD(rddId: Int) =
39-
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
63+
/** Return the IDs of the RDDs which have blocks stored in this block manager. */
64+
def rddIds: Seq[Int] = _rddIds.toSeq
4065

41-
def diskUsed = blocks.values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
66+
/** Return the RDD blocks stored in this block manager as a mapping from ID to status. */
67+
def rddBlocks: Map[RDDBlockId, BlockStatus] =
68+
blocks.filterKeys(_.isInstanceOf[RDDBlockId]).asInstanceOf[Map[RDDBlockId, BlockStatus]]
69+
70+
/**
71+
* Return the RDD blocks with the given RDD ID stored in this block manager as a mapping
72+
* from ID to status.
73+
*/
74+
def rddBlocks(rddId: Int): Map[RDDBlockId, BlockStatus] = rddBlocks.filterKeys(_.rddId == rddId)
4275

43-
def diskUsedByRDD(rddId: Int) =
44-
rddBlocks.filterKeys(_.rddId == rddId).values.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
76+
/** Return the memory used by this block manager. */
77+
def memUsed: Long = memUsed(blocks.values)
4578

79+
/** Return the memory used by the given RDD in this block manager. */
80+
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocks(rddId).values)
81+
82+
/** Return the memory remaining in this block manager. */
4683
def memRemaining: Long = maxMem - memUsed
4784

48-
def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) }
85+
/** Return the disk space used by this block manager. */
86+
def diskUsed: Long = diskUsed(blocks.values)
87+
88+
/** Return the disk space used by the given RDD in this block manager. */
89+
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocks(rddId).values)
90+
91+
// Helper methods for computing memory and disk usages
92+
private def memUsed(statuses: Iterable[BlockStatus]): Long =
93+
statuses.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
94+
private def diskUsed(statuses: Iterable[BlockStatus]): Long =
95+
statuses.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
4996
}
5097

5198
/** Helper methods for storage-related objects. */
5299
private[spark] object StorageUtils {
53100

54-
/**
55-
* Returns basic information of all RDDs persisted in the given SparkContext. This does not
56-
* include storage information.
57-
*/
58-
def rddInfoFromSparkContext(sc: SparkContext): Array[RDDInfo] = {
59-
sc.persistentRdds.values.map { rdd =>
101+
/** Returns storage information of all RDDs persisted in the given SparkContext. */
102+
def rddInfoFromStorageStatus(
103+
storageStatuses: Seq[StorageStatus],
104+
sc: SparkContext): Array[RDDInfo] = {
105+
val rddInfos = sc.persistentRdds.values.map { rdd =>
60106
val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
61107
val rddNumPartitions = rdd.partitions.size
62108
val rddStorageLevel = rdd.getStorageLevel
63109
val rddInfo = new RDDInfo(rdd.id, rddName, rddNumPartitions, rddStorageLevel)
64110
rddInfo
65111
}.toArray
66-
}
67-
68-
/** Returns storage information of all RDDs persisted in the given SparkContext. */
69-
def rddInfoFromStorageStatus(
70-
storageStatuses: Seq[StorageStatus],
71-
sc: SparkContext): Array[RDDInfo] = {
72-
rddInfoFromStorageStatus(storageStatuses, rddInfoFromSparkContext(sc))
112+
rddInfoFromStorageStatus(storageStatuses, rddInfos)
113+
rddInfos
73114
}
74115

75116
/** Returns storage information of all RDDs in the given list. */
76117
def rddInfoFromStorageStatus(
77118
storageStatuses: Seq[StorageStatus],
78119
rddInfos: Seq[RDDInfo],
79-
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
120+
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {
80121

81122
// Mapping from a block ID -> its status
82123
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
@@ -94,7 +135,7 @@ private[spark] object StorageUtils {
94135
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
95136
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
96137

97-
val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
138+
rddBlockMap.foreach { case (rddId, blocks) =>
98139
// Add up memory, disk and Tachyon sizes
99140
val persistedBlocks =
100141
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
@@ -111,31 +152,31 @@ private[spark] object StorageUtils {
111152
rddInfo.tachyonSize = tachyonSize
112153
rddInfo
113154
}
114-
}.toArray
115-
116-
scala.util.Sorting.quickSort(rddStorageInfos)
117-
rddStorageInfos
155+
}
118156
}
119157

120158
/** Returns a mapping from BlockId to the locations of the associated block. */
121159
def blockLocationsFromStorageStatus(
122160
storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
161+
// An ungrouped list of (blockId, location) pairs
123162
val blockLocationPairs = storageStatuses.flatMap { storageStatus =>
124163
storageStatus.blocks.map { case (bid, _) => (bid, storageStatus.blockManagerId.hostPort) }
125164
}
126-
blockLocationPairs.toMap
165+
blockLocationPairs
127166
.groupBy { case (blockId, _) => blockId }
128-
.mapValues(_.values.toSeq)
167+
.mapValues { rddLocations => rddLocations.map { case (_, location) => location } }
129168
}
130169

131170
/** Filters the given list of StorageStatus by the given RDD ID. */
132171
def filterStorageStatusByRDD(
133172
storageStatuses: Seq[StorageStatus],
134173
rddId: Int): Array[StorageStatus] = {
135-
storageStatuses.map { status =>
136-
val filteredBlocks = status.rddBlocks.filterKeys(_.rddId == rddId).toSeq
137-
val filteredBlockMap = mutable.Map[BlockId, BlockStatus](filteredBlocks: _*)
138-
new StorageStatus(status.blockManagerId, status.maxMem, filteredBlockMap)
139-
}.toArray
174+
storageStatuses
175+
.filter(_.rddIds.contains(rddId))
176+
.map { status =>
177+
val newStatus = new StorageStatus(status.blockManagerId, status.maxMem)
178+
status.rddBlocks(rddId).foreach { case (bid, bstatus) => newStatus.addBlock(bid, bstatus) }
179+
newStatus
180+
}.toArray
140181
}
141182
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
5151
val storageStatusList = listener.storageStatusList
5252
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
5353
val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
54-
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
54+
val diskSpaceUsed = storageStatusList.map(_.diskUsed).reduceOption(_ + _).getOrElse(0L)
5555
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
5656
val execInfoSorted = execInfo.sortBy(_.id)
5757

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
5151
/** Update each RDD's info to reflect any updates to the RDD's storage status */
5252
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
5353
val rddInfos = _rddInfoMap.values.toSeq
54-
val updatedRddInfos =
55-
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
56-
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
54+
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
5755
}
5856

5957
/**

0 commit comments

Comments
 (0)