Skip to content

Commit b66b6b0

Browse files
committed
Use more efficient underlying data structures for blocks
Previously we were still linearly traversing all the blocks held by each storage status. Now we index by the RDD ID and return only the blocks of interest to us.
1 parent 6a7b7c0 commit b66b6b0

File tree

6 files changed

+192
-175
lines changed

6 files changed

+192
-175
lines changed

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class StorageStatusListener extends SparkListener {
5050
/** Update storage status list to reflect the removal of an RDD from the cache */
5151
private def updateStorageStatus(unpersistedRDDId: Int) {
5252
storageStatusList.foreach { storageStatus =>
53-
storageStatus.rddBlocks(unpersistedRDDId).foreach { case (blockId, _) =>
53+
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
5454
storageStatus.removeBlock(blockId)
5555
}
5656
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 85 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.storage
2020
import scala.collection.Map
2121
import scala.collection.mutable
2222

23-
import org.apache.spark.SparkContext
2423
import org.apache.spark.annotation.DeveloperApi
2524

2625
/**
@@ -31,9 +30,24 @@ import org.apache.spark.annotation.DeveloperApi
3130
@DeveloperApi
3231
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
3332

34-
// This should not be mutated directly, but through the add/update/removeBlock methods
35-
private val _blocks = new mutable.HashMap[BlockId, BlockStatus]
36-
private val _rddIds = new mutable.HashSet[Int]
33+
/**
34+
* Internal representation of the blocks stored in this block manager.
35+
*
36+
* Common consumption patterns of these blocks include
37+
* (1) selecting all blocks,
38+
* (2) selecting only RDD blocks or,
39+
* (3) selecting only the blocks that belong to a specific RDD
40+
*
41+
* If we are only interested in a fraction of the blocks, as in (2) and (3), we should avoid
42+
* linearly scanning through all the blocks, which could be expensive if there are thousands
43+
* of blocks on each block manager. We achieve this by storing RDD blocks and non-RDD blocks
44+
* separately. In particular, RDD blocks are stored in a map indexed by RDD IDs, so we can
45+
* filter out the blocks of interest quickly.
46+
*
47+
* These collections should only be mutated through the add/update/removeBlock methods.
48+
*/
49+
private val _rddBlocks = new mutable.HashMap[Int, mutable.Map[BlockId, BlockStatus]]
50+
private val _nonRddBlocks = new mutable.HashMap[BlockId, BlockStatus]
3751

3852
/**
3953
* Instantiate a StorageStatus with the given initial blocks. This essentially makes a copy of
@@ -44,67 +58,94 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
4458
initialBlocks.foreach { case (blockId, blockStatus) => addBlock(blockId, blockStatus) }
4559
}
4660

47-
/** Return the blocks stored in this block manager as a mapping from ID to status. */
48-
def blocks: Map[BlockId, BlockStatus] = _blocks
61+
/** Return the blocks stored in this block manager. */
62+
def blocks: Seq[(BlockId, BlockStatus)] = {
63+
_nonRddBlocks.toSeq ++ rddBlocks.toSeq
64+
}
65+
66+
/** Return the RDD blocks stored in this block manager. */
67+
def rddBlocks: Seq[(BlockId, BlockStatus)] = {
68+
_rddBlocks.flatMap { case (_, blocks) => blocks }.toSeq
69+
}
70+
71+
/** Return the blocks that belong to the given RDD stored in this block manager. */
72+
def rddBlocksById(rddId: Int): Seq[(BlockId, BlockStatus)] = {
73+
_rddBlocks.get(rddId).map(_.toSeq).getOrElse(Seq.empty)
74+
}
4975

50-
/** Add the given block, keeping track of the RDD ID if this is an RDD block. */
76+
/** Add the given block to this storage status. */
5177
def addBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = {
5278
blockId match {
53-
case RDDBlockId(rddId, _) => _rddIds.add(rddId)
79+
case RDDBlockId(rddId, _) =>
80+
_rddBlocks.getOrElseUpdate(rddId, new mutable.HashMap)(blockId) = blockStatus
5481
case _ =>
82+
_nonRddBlocks(blockId) = blockStatus
5583
}
56-
_blocks(blockId) = blockStatus
5784
}
5885

59-
/** Update the given block, keeping track of the RDD ID if this is an RDD block. */
86+
/** Update the given block in this storage status. If it doesn't already exist, add it. */
6087
def updateBlock(blockId: BlockId, blockStatus: BlockStatus): Unit = addBlock(blockId, blockStatus)
6188

62-
/** Remove the given block, keeping track of the RDD ID if this is an RDD block. */
89+
/** Remove the given block from this storage status. */
6390
def removeBlock(blockId: BlockId): Option[BlockStatus] = {
64-
val removed = _blocks.remove(blockId)
6591
blockId match {
6692
case RDDBlockId(rddId, _) =>
67-
if (rddBlocks(rddId).isEmpty) {
68-
_rddIds.remove(rddId)
93+
if (_rddBlocks.contains(rddId)) {
94+
val removed = _rddBlocks(rddId).remove(blockId)
95+
// If the given RDD has no more blocks left, remove the RDD
96+
if (_rddBlocks(rddId).isEmpty) {
97+
_rddBlocks.remove(rddId)
98+
}
99+
removed
100+
} else {
101+
None
69102
}
70103
case _ =>
104+
_nonRddBlocks.remove(blockId)
71105
}
72-
removed
73106
}
74107

75-
/** Return the IDs of the RDDs which have blocks stored in this block manager. */
76-
def rddIds: Seq[Int] = _rddIds.toSeq
77-
78-
/** Return the RDD blocks stored in this block manager as a mapping from ID to status. */
79-
def rddBlocks: Map[RDDBlockId, BlockStatus] =
80-
blocks.filterKeys(_.isInstanceOf[RDDBlockId]).asInstanceOf[Map[RDDBlockId, BlockStatus]]
108+
/**
109+
* Return whether the given block is stored in this block manager in O(1) time.
110+
* Note that the alternative of doing this through `blocks` is O(blocks), which is much slower.
111+
*/
112+
def containsBlock(blockId: BlockId): Boolean = {
113+
blockId match {
114+
case RDDBlockId(rddId, _) =>
115+
_rddBlocks.get(rddId).filter(_.contains(blockId)).isDefined
116+
case _ =>
117+
_nonRddBlocks.contains(blockId)
118+
}
119+
}
81120

82121
/**
83-
* Return the RDD blocks with the given RDD ID stored in this block manager as a mapping
84-
* from ID to status.
122+
* Return the number of blocks in O(R) time, where R is the number of distinct RDD IDs.
123+
* Note that the alternative of doing this through `blocks` is O(blocks), which is much slower.
85124
*/
86-
def rddBlocks(rddId: Int): Map[RDDBlockId, BlockStatus] = rddBlocks.filterKeys(_.rddId == rddId)
125+
def numBlocks: Int = {
126+
_nonRddBlocks.size + _rddBlocks.values.map(_.size).reduceOption(_ + _).getOrElse(0)
127+
}
87128

88129
/** Return the memory used by this block manager. */
89-
def memUsed: Long = memUsed(blocks.values)
130+
def memUsed: Long = memUsed(blocks)
90131

91132
/** Return the memory used by the given RDD in this block manager. */
92-
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocks(rddId).values)
133+
def memUsedByRDD(rddId: Int): Long = memUsed(rddBlocksById(rddId))
93134

94135
/** Return the memory remaining in this block manager. */
95136
def memRemaining: Long = maxMem - memUsed
96137

97138
/** Return the disk space used by this block manager. */
98-
def diskUsed: Long = diskUsed(blocks.values)
139+
def diskUsed: Long = diskUsed(blocks)
99140

100141
/** Return the disk space used by the given RDD in this block manager. */
101-
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocks(rddId).values)
142+
def diskUsedByRDD(rddId: Int): Long = diskUsed(rddBlocksById(rddId))
102143

103144
// Helper methods for computing memory and disk usages
104-
private def memUsed(statuses: Iterable[BlockStatus]): Long =
105-
statuses.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
106-
private def diskUsed(statuses: Iterable[BlockStatus]): Long =
107-
statuses.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
145+
private def memUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long =
146+
_blocks.map { case (_, s) => s.memSize }.reduceOption(_ + _).getOrElse(0L)
147+
private def diskUsed(_blocks: Seq[(BlockId, BlockStatus)]): Long =
148+
_blocks.map { case (_, s) => s.diskSize }.reduceOption(_ + _).getOrElse(0L)
108149
}
109150

110151
/** Helper methods for storage-related objects. */
@@ -123,18 +164,13 @@ private[spark] object StorageUtils {
123164
val rddId = rddInfo.id
124165

125166
// Collect all block statuses that belong to the given RDD
126-
val newBlocks = updatedBlocks.filter { case (b, _) =>
127-
b.asRDDId.filter(_.rddId == rddId).isDefined
167+
val newBlocks = updatedBlocks.filter { case (bid, _) =>
168+
bid.asRDDId.filter(_.rddId == rddId).isDefined
128169
}
129170
val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
130-
val oldBlocks = storageStatuses.flatMap { s =>
131-
if (s.rddIds.contains(rddId)) {
132-
// If the block is being updated, leave it out here in favor of the new status
133-
s.rddBlocks(rddId).filterKeys { bid => !newBlockIds.contains(bid) }
134-
} else {
135-
Seq.empty
136-
}
137-
}
171+
val oldBlocks = storageStatuses
172+
.flatMap(_.rddBlocksById(rddId))
173+
.filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid double counting
138174
val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
139175
val persistedBlocks = blocks.filter(_.isCached)
140176

@@ -151,30 +187,20 @@ private[spark] object StorageUtils {
151187
}
152188
}
153189

154-
/** Return a mapping from block ID to the locations of the associated block. */
155-
def getBlockLocations(storageStatuses: Seq[StorageStatus]): Map[BlockId, Seq[String]] = {
190+
/**
191+
* Return mapping from block ID to its locations for each block that belongs to the given RDD.
192+
*/
193+
def getRDDBlockLocations(
194+
storageStatuses: Seq[StorageStatus],
195+
rddId: Int): Map[BlockId, Seq[String]] = {
156196
val blockLocations = new mutable.HashMap[BlockId, mutable.ListBuffer[String]]
157197
storageStatuses.foreach { status =>
158-
status.blocks.foreach { case (bid, _) =>
198+
status.rddBlocksById(rddId).foreach { case (bid, _) =>
159199
val location = status.blockManagerId.hostPort
160200
blockLocations.getOrElseUpdate(bid, mutable.ListBuffer.empty) += location
161201
}
162202
}
163203
blockLocations
164204
}
165205

166-
/**
167-
* Return a filtered list of storage statuses in which the only blocks remaining are the ones
168-
* that belong to given RDD.
169-
*/
170-
def filterByRDD(storageStatuses: Seq[StorageStatus], rddId: Int): Seq[StorageStatus] = {
171-
storageStatuses
172-
.filter(_.rddIds.contains(rddId))
173-
.map { status =>
174-
new StorageStatus(
175-
status.blockManagerId,
176-
status.maxMem,
177-
status.rddBlocks(rddId).asInstanceOf[Map[BlockId, BlockStatus]])
178-
}
179-
}
180206
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
145145
val status = listener.storageStatusList(statusId)
146146
val execId = status.blockManagerId.executorId
147147
val hostPort = status.blockManagerId.hostPort
148-
val rddBlocks = status.blocks.size
148+
val rddBlocks = status.numBlocks
149149
val memUsed = status.memUsed
150150
val maxMem = status.maxMem
151151
val diskUsed = status.diskUsed

core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,13 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") {
4545
val workerTable = UIUtils.listingTable(workerHeader, workerRow, workers)
4646

4747
// Block table
48-
val filteredStorageStatusList = StorageUtils.filterByRDD(storageStatusList, rddId)
49-
val blockStatuses = filteredStorageStatusList.flatMap(_.blocks).sortWith(_._1.name < _._1.name)
50-
val blockLocations = StorageUtils.getBlockLocations(filteredStorageStatusList)
51-
val blocks = blockStatuses.map { case (blockId, status) =>
52-
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
53-
}
48+
val blockLocations = StorageUtils.getRDDBlockLocations(storageStatusList, rddId)
49+
val blocks = storageStatusList
50+
.flatMap(_.rddBlocksById(rddId))
51+
.sortWith(_._1.name < _._1.name)
52+
.map { case (blockId, status) =>
53+
(blockId, status, blockLocations.get(blockId).getOrElse(Seq[String]("Unknown")))
54+
}
5455
val blockTable = UIUtils.listingTable(blockHeader, blockRow, blocks)
5556

5657
val content =

core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ class StorageStatusListenerSuite extends FunSuite {
4141
assert(listener.executorIdToStorageStatus.get("big").isDefined)
4242
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
4343
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
44-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
44+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
4545
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
4646
assert(listener.executorIdToStorageStatus.size === 2)
4747
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
4848
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
4949
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
50-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
50+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
5151

5252
// Block manager remove
5353
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
@@ -67,14 +67,14 @@ class StorageStatusListenerSuite extends FunSuite {
6767
val taskMetrics = new TaskMetrics
6868

6969
// Task end with no updated blocks
70-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
71-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
70+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
71+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
7272
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
73-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
74-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
73+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
74+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
7575
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
76-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
77-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
76+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
77+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
7878
}
7979

8080
test("task end with updated blocks") {
@@ -90,20 +90,20 @@ class StorageStatusListenerSuite extends FunSuite {
9090
taskMetrics2.updatedBlocks = Some(Seq(block3))
9191

9292
// Task end with new blocks
93-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
94-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
93+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
94+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
9595
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
96-
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
97-
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
98-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
99-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
100-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
96+
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
97+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
98+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
99+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
100+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
101101
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
102-
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
103-
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
104-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
105-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
106-
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
102+
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
103+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
104+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
105+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
106+
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
107107

108108
// Task end with dropped blocks
109109
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
@@ -112,17 +112,17 @@ class StorageStatusListenerSuite extends FunSuite {
112112
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
113113
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
114114
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
115-
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
116-
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
117-
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
118-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
119-
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
115+
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
116+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
117+
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
118+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
119+
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
120120
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
121-
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
122-
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
123-
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
124-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
125-
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
121+
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
122+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
123+
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
124+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
125+
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
126126
}
127127

128128
test("unpersist RDD") {
@@ -137,16 +137,16 @@ class StorageStatusListenerSuite extends FunSuite {
137137
taskMetrics2.updatedBlocks = Some(Seq(block3))
138138
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
139139
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
140-
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
140+
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
141141

142142
// Unpersist RDD
143143
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
144-
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
144+
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
145145
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
146-
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
147-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
148-
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
146+
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
147+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
148+
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
149149
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
150-
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
150+
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
151151
}
152152
}

0 commit comments

Comments
 (0)