Skip to content

Commit 8e91921

Browse files
committed
Iterate through a filtered set of blocks when updating RDDInfo
This particular commit is the whole point of this PR. In the existing code we unconditionally iterate through all blocks in all block managers whenever we want to update an RDDInfo. Now, we filter out only the blocks of interest to us in advance, so we don't end up constructing a huge map and doing a groupBy on it.
1 parent 7b2c4aa commit 8e91921

File tree

4 files changed

+78
-75
lines changed

4 files changed

+78
-75
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ class SparkContext(config: SparkConf) extends Logging {
841841
@DeveloperApi
842842
def getRDDStorageInfo: Array[RDDInfo] = {
843843
val rddInfos = StorageUtils.makeRddInfo(this)
844-
StorageUtils.updateRddInfo(getExecutorStorageStatus, rddInfos)
844+
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
845845
rddInfos.toArray
846846
}
847847

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -121,36 +121,39 @@ private[spark] object StorageUtils {
121121
}
122122
}
123123

124-
/** Update the given list of RDDInfo with the given list of storage statuses. */
125-
def updateRddInfo(storageStatuses: Seq[StorageStatus], rddInfos: Seq[RDDInfo]): Unit = {
126-
// Mapping from a block ID -> its status
127-
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
128-
129-
// Mapping from RDD ID -> an array of associated BlockStatuses
130-
val rddBlockMap = blockMap
131-
.groupBy { case (k, _) => k.rddId }
132-
.mapValues(_.values.toArray)
133-
134-
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
135-
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
136-
137-
rddBlockMap.foreach { case (rddId, blocks) =>
138-
// Add up memory, disk and Tachyon sizes
139-
val persistedBlocks =
140-
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
141-
val _storageLevel =
142-
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
124+
/**
125+
* Update the given list of RDDInfo with the given list of storage statuses.
126+
* This method overwrites the old values stored in the RDDInfo's.
127+
*/
128+
def updateRddInfo(
129+
rddInfos: Seq[RDDInfo],
130+
storageStatuses: Seq[StorageStatus],
131+
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {
132+
rddInfos.foreach { rddInfo =>
133+
val rddId = rddInfo.id
134+
135+
// Collect all block statuses that belong to the given RDD
136+
val newBlocks = updatedBlocks
137+
.collect { case (bid: RDDBlockId, bstatus) => (bid, bstatus) }
138+
.filter { case (bid, _) => bid.rddId == rddId }
139+
val newBlockIds = newBlocks.map { case (bid, _) => bid }.toSet
140+
val oldBlocks = storageStatuses
141+
.filter(_.rddIds.contains(rddId))
142+
.flatMap(_.rddBlocks(rddId))
143+
.filter { case (bid, _) => !newBlockIds.contains(bid) } // avoid duplicates
144+
val blocks = (oldBlocks ++ newBlocks).map { case (_, bstatus) => bstatus }
145+
val persistedBlocks = blocks.filter { s => s.memSize + s.diskSize + s.tachyonSize > 0 }
146+
147+
// Assume all blocks belonging to the same RDD have the same storage level
148+
val storageLevel = blocks.headOption.map(_.storageLevel).getOrElse(StorageLevel.NONE)
143149
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
144150
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
145151
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
146-
rddInfoMap.get(rddId).map { rddInfo =>
147-
rddInfo.storageLevel = _storageLevel
148-
rddInfo.numCachedPartitions = persistedBlocks.length
149-
rddInfo.memSize = memSize
150-
rddInfo.diskSize = diskSize
151-
rddInfo.tachyonSize = tachyonSize
152-
rddInfo
153-
}
152+
rddInfo.storageLevel = storageLevel
153+
rddInfo.numCachedPartitions = persistedBlocks.length
154+
rddInfo.memSize = memSize
155+
rddInfo.diskSize = diskSize
156+
rddInfo.tachyonSize = tachyonSize
154157
}
155158
}
156159

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
4949
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
5050

5151
/** Update each RDD's info to reflect any updates in the RDD's storage status */
52-
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Unit = {
53-
StorageUtils.updateRddInfo(storageStatusList, _rddInfoMap.values.toSeq)
52+
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
53+
StorageUtils.updateRddInfo(_rddInfoMap.values.toSeq, storageStatusList, updatedBlocks)
5454
}
5555

5656
/**

core/src/test/scala/org/apache/spark/storage/StorageSuite.scala

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ class StorageSuite extends FunSuite {
182182
test("StorageUtils.updateRddInfo") {
183183
val storageStatuses = stockStorageStatuses
184184
val rddInfos = stockRDDInfos
185-
StorageUtils.updateRddInfo(storageStatuses, rddInfos)
185+
StorageUtils.updateRddInfo(rddInfos, storageStatuses)
186186
assert(rddInfos(0).numCachedPartitions === 5)
187187
assert(rddInfos(0).memSize === 5L)
188188
assert(rddInfos(0).diskSize === 10L)
@@ -191,50 +191,50 @@ class StorageSuite extends FunSuite {
191191
assert(rddInfos(1).diskSize === 6L)
192192
}
193193

194-
// test("StorageUtils.rddInfoFromStorageStatus with updated blocks") {
195-
// val storageStatuses = stockStorageStatuses
196-
// val rddInfos = stockRDDInfos
197-
//
198-
// // Drop 3 blocks from RDD 0, and cache more of RDD 1
199-
// val updatedBlocks1 = Seq(
200-
// (RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
201-
// (RDDBlockId(0, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
202-
// (RDDBlockId(0, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
203-
// (RDDBlockId(1, 0), BlockStatus(memAndDisk, 100L, 100L, 0L)),
204-
// (RDDBlockId(1, 100), BlockStatus(memAndDisk, 100L, 100L, 0L))
205-
// )
206-
// StorageUtils.rddInfoFromStorageStatus(storageStatuses, rddInfos, updatedBlocks1)
207-
// assert(rddInfos(0).numCachedPartitions === 2)
208-
// assert(rddInfos(0).memSize === 2L)
209-
// assert(rddInfos(0).diskSize === 4L)
210-
// assert(rddInfos(1).numCachedPartitions === 4)
211-
// assert(rddInfos(1).memSize === 202L)
212-
// assert(rddInfos(1).diskSize === 204L)
213-
//
214-
// // Actually update storage statuses so we can chain the calls to rddInfoFromStorageStatus
215-
// updatedBlocks1.foreach { case (bid, bstatus) =>
216-
// val statusWithBlock = storageStatuses.find(_.blocks.contains(bid))
217-
// statusWithBlock match {
218-
// case Some(s) => s.updateBlock(bid, bstatus)
219-
// case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first
220-
// }
221-
// }
222-
//
223-
// // Drop all of RDD 1
224-
// val updatedBlocks2 = Seq(
225-
// (RDDBlockId(1, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
226-
// (RDDBlockId(1, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
227-
// (RDDBlockId(1, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
228-
// (RDDBlockId(1, 100), BlockStatus(memAndDisk, 0L, 0L, 0L))
229-
// )
230-
// StorageUtils.rddInfoFromStorageStatus(storageStatuses, rddInfos, updatedBlocks2)
231-
// assert(rddInfos(0).numCachedPartitions === 2)
232-
// assert(rddInfos(0).memSize === 2L)
233-
// assert(rddInfos(0).diskSize === 4L)
234-
// assert(rddInfos(1).numCachedPartitions === 0)
235-
// assert(rddInfos(1).memSize === 0L)
236-
// assert(rddInfos(1).diskSize === 0L)
237-
// }
194+
test("StorageUtils.updateRddInfo with updated blocks") {
195+
val storageStatuses = stockStorageStatuses
196+
val rddInfos = stockRDDInfos
197+
198+
// Drop 3 blocks from RDD 0, and cache more of RDD 1
199+
val updatedBlocks1 = Seq(
200+
(RDDBlockId(0, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
201+
(RDDBlockId(0, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
202+
(RDDBlockId(0, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
203+
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 100L, 100L, 0L)),
204+
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 100L, 100L, 0L))
205+
)
206+
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks1)
207+
assert(rddInfos(0).numCachedPartitions === 2)
208+
assert(rddInfos(0).memSize === 2L)
209+
assert(rddInfos(0).diskSize === 4L)
210+
assert(rddInfos(1).numCachedPartitions === 4)
211+
assert(rddInfos(1).memSize === 202L)
212+
assert(rddInfos(1).diskSize === 204L)
213+
214+
// Actually update storage statuses so we can chain the calls to rddInfoFromStorageStatus
215+
updatedBlocks1.foreach { case (bid, bstatus) =>
216+
val statusWithBlock = storageStatuses.find(_.blocks.contains(bid))
217+
statusWithBlock match {
218+
case Some(s) => s.updateBlock(bid, bstatus)
219+
case None => storageStatuses(0).addBlock(bid, bstatus) // arbitrarily pick the first
220+
}
221+
}
222+
223+
// Drop all of RDD 1
224+
val updatedBlocks2 = Seq(
225+
(RDDBlockId(1, 0), BlockStatus(memAndDisk, 0L, 0L, 0L)),
226+
(RDDBlockId(1, 1), BlockStatus(memAndDisk, 0L, 0L, 0L)),
227+
(RDDBlockId(1, 2), BlockStatus(memAndDisk, 0L, 0L, 0L)),
228+
(RDDBlockId(1, 100), BlockStatus(memAndDisk, 0L, 0L, 0L))
229+
)
230+
StorageUtils.updateRddInfo(rddInfos, storageStatuses, updatedBlocks2)
231+
assert(rddInfos(0).numCachedPartitions === 2)
232+
assert(rddInfos(0).memSize === 2L)
233+
assert(rddInfos(0).diskSize === 4L)
234+
assert(rddInfos(1).numCachedPartitions === 0)
235+
assert(rddInfos(1).memSize === 0L)
236+
assert(rddInfos(1).diskSize === 0L)
237+
}
238238

239239
test("StorageUtils.getBlockLocations") {
240240
val storageStatuses = stockStorageStatuses

0 commit comments

Comments
 (0)