Skip to content

Commit 3afde3f

Browse files
committed
Correctly report the number of blocks on SparkUI
This is actually quite tricky to get right. With this commit, StorageStatusListener will only hold cached blocks (i.e. no blocks with StorageLevel.NONE). This means the StorageTab needs special handling, because it currently relies on dropped blocks having StorageLevel.NONE, rather than disappearing altogether in the storage status list.
1 parent 0e0686d commit 3afde3f

File tree

5 files changed

+27
-19
lines changed

5 files changed

+27
-19
lines changed

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ class StorageStatusListener extends SparkListener {
3737
val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
3838
filteredStatus.foreach { storageStatus =>
3939
updatedBlocks.foreach { case (blockId, updatedStatus) =>
40-
storageStatus.blocks(blockId) = updatedStatus
40+
if (updatedStatus.storageLevel == StorageLevel.NONE) {
41+
storageStatus.blocks.remove(blockId)
42+
} else {
43+
storageStatus.blocks(blockId) = updatedStatus
44+
}
4145
}
4246
}
4347
}
@@ -47,7 +51,7 @@ class StorageStatusListener extends SparkListener {
4751
storageStatusList.foreach { storageStatus =>
4852
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
4953
unpersistedBlocksIds.foreach { blockId =>
50-
storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
54+
storageStatus.blocks.remove(blockId)
5155
}
5256
}
5357
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,26 @@ private[spark] object StorageUtils {
7575
/** Returns storage information of all RDDs in the given list. */
7676
def rddInfoFromStorageStatus(
7777
storageStatuses: Seq[StorageStatus],
78-
rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
78+
rddInfos: Seq[RDDInfo],
79+
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
80+
81+
// Mapping from a block ID -> its status
82+
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
83+
84+
// Record updated blocks, if any
85+
updatedBlocks
86+
.collect { case (id: RDDBlockId, status) => (id, status) }
87+
.foreach { case (id, status) => blockMap(id) = status }
7988

8089
// Mapping from RDD ID -> an array of associated BlockStatuses
81-
val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
90+
val rddBlockMap = blockMap
8291
.groupBy { case (k, _) => k.rddId }
8392
.mapValues(_.values.toArray)
8493

8594
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
8695
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
8796

88-
val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
97+
val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
8998
// Add up memory, disk and Tachyon sizes
9099
val persistedBlocks =
91100
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
108108
val status = listener.storageStatusList(statusId)
109109
val execId = status.blockManagerId.executorId
110110
val hostPort = status.blockManagerId.hostPort
111-
val rddBlocks = status.blocks.count { case (_, blockStatus) =>
112-
blockStatus.storageLevel != StorageLevel.NONE
113-
}
111+
val rddBlocks = status.blocks.size
114112
val memUsed = status.memUsed
115113
val maxMem = status.maxMem
116114
val diskUsed = status.diskUsed

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
3939
* A SparkListener that prepares information to be displayed on the ExecutorsTab
4040
*/
4141
@DeveloperApi
42-
class ExecutorsListener(storageStatusListener: StorageStatusListener)
43-
extends SparkListener {
44-
42+
class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
4543
val executorToTasksActive = HashMap[String, Int]()
4644
val executorToTasksComplete = HashMap[String, Int]()
4745
val executorToTasksFailed = HashMap[String, Int]()

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.ui._
2424
import org.apache.spark.scheduler._
25-
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
25+
import org.apache.spark.storage._
2626

2727
/** Web UI showing storage status of all RDD's in the given SparkContext. */
2828
private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
@@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
4040
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
4141
*/
4242
@DeveloperApi
43-
class StorageListener(storageStatusListener: StorageStatusListener)
44-
extends SparkListener {
45-
43+
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
4644
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
4745

4846
def storageStatusList = storageStatusListener.storageStatusList
@@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener)
5149
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
5250

5351
/** Update each RDD's info to reflect any updates to the RDD's storage status */
54-
private def updateRDDInfo() {
52+
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
5553
val rddInfos = _rddInfoMap.values.toSeq
56-
val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
54+
val updatedRddInfos =
55+
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
5756
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
5857
}
5958

@@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener)
6463
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
6564
val metrics = taskEnd.taskMetrics
6665
if (metrics != null && metrics.updatedBlocks.isDefined) {
67-
updateRDDInfo()
66+
updateRDDInfo(metrics.updatedBlocks.get)
6867
}
6968
}
7069

@@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener)
7978
}
8079

8180
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
82-
updateRDDInfo()
81+
_rddInfoMap.remove(unpersistRDD.rddId)
8382
}
8483
}

0 commit comments

Comments
 (0)