Skip to content

Commit cf1d46e

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-2307][Reprise] Correctly report RDD blocks on SparkUI
**Problem.** The existing code in `ExecutorPage.scala` requires a linear scan through all the blocks to filter out the uncached ones. Every refresh could be expensive if there are many blocks and many executors. **Solution.** The proper semantics should be the following: `StorageStatusListener` should contain only block statuses that are cached. This means as soon as a block is unpersisted by any mean, its status should be removed. This is reflected in the changes made in `StorageStatusListener.scala`. Further, the `StorageTab` must stop relying on the `StorageStatusListener` changing a dropped block's status to `StorageLevel.NONE` (which no longer happens). This is reflected in the changes made in `StorageTab.scala` and `StorageUtils.scala`. ---------- If you have been following this chain of PRs like pwendell, you will quickly notice that this reverts the changes in #1249, which reverts the changes in #1080. In other words, we are adding back the changes from #1080, and fixing SPARK-2307 on top of those changes. Please ask questions if you are confused. Author: Andrew Or <[email protected]> Closes #1255 from andrewor14/storage-ui-fix-reprise and squashes the following commits: 45416fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into storage-ui-fix-reprise a82ea25 [Andrew Or] Add tests for StorageStatusListener 8773b01 [Andrew Or] Update comment / minor changes 3afde3f [Andrew Or] Correctly report the number of blocks on SparkUI (cherry picked from commit 3894a49) Signed-off-by: Patrick Wendell <[email protected]>
1 parent 27a2afe commit cf1d46e

File tree

6 files changed

+184
-23
lines changed

6 files changed

+184
-23
lines changed

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
2828
*/
2929
@DeveloperApi
3030
class StorageStatusListener extends SparkListener {
31-
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
31+
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
32+
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
3233

3334
def storageStatusList = executorIdToStorageStatus.values.toSeq
3435

3536
/** Update storage status list to reflect updated block statuses */
36-
def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
37-
val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
37+
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
38+
val filteredStatus = executorIdToStorageStatus.get(execId)
3839
filteredStatus.foreach { storageStatus =>
3940
updatedBlocks.foreach { case (blockId, updatedStatus) =>
40-
storageStatus.blocks(blockId) = updatedStatus
41+
if (updatedStatus.storageLevel == StorageLevel.NONE) {
42+
storageStatus.blocks.remove(blockId)
43+
} else {
44+
storageStatus.blocks(blockId) = updatedStatus
45+
}
4146
}
4247
}
4348
}
4449

4550
/** Update storage status list to reflect the removal of an RDD from the cache */
46-
def updateStorageStatus(unpersistedRDDId: Int) {
51+
private def updateStorageStatus(unpersistedRDDId: Int) {
4752
storageStatusList.foreach { storageStatus =>
4853
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
4954
unpersistedBlocksIds.foreach { blockId =>
50-
storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
55+
storageStatus.blocks.remove(blockId)
5156
}
5257
}
5358
}

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,26 @@ private[spark] object StorageUtils {
7575
/** Returns storage information of all RDDs in the given list. */
7676
def rddInfoFromStorageStatus(
7777
storageStatuses: Seq[StorageStatus],
78-
rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
78+
rddInfos: Seq[RDDInfo],
79+
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {
80+
81+
// Mapping from a block ID -> its status
82+
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)
83+
84+
// Record updated blocks, if any
85+
updatedBlocks
86+
.collect { case (id: RDDBlockId, status) => (id, status) }
87+
.foreach { case (id, status) => blockMap(id) = status }
7988

8089
// Mapping from RDD ID -> an array of associated BlockStatuses
81-
val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
90+
val rddBlockMap = blockMap
8291
.groupBy { case (k, _) => k.rddId }
8392
.mapValues(_.values.toArray)
8493

8594
// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
8695
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap
8796

88-
val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
97+
val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
8998
// Add up memory, disk and Tachyon sizes
9099
val persistedBlocks =
91100
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
108108
val status = listener.storageStatusList(statusId)
109109
val execId = status.blockManagerId.executorId
110110
val hostPort = status.blockManagerId.hostPort
111-
val rddBlocks = status.blocks.count { case (_, blockStatus) =>
112-
blockStatus.storageLevel != StorageLevel.NONE
113-
}
111+
val rddBlocks = status.blocks.size
114112
val memUsed = status.memUsed
115113
val maxMem = status.maxMem
116114
val diskUsed = status.diskUsed

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
3939
* A SparkListener that prepares information to be displayed on the ExecutorsTab
4040
*/
4141
@DeveloperApi
42-
class ExecutorsListener(storageStatusListener: StorageStatusListener)
43-
extends SparkListener {
44-
42+
class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
4543
val executorToTasksActive = HashMap[String, Int]()
4644
val executorToTasksComplete = HashMap[String, Int]()
4745
val executorToTasksFailed = HashMap[String, Int]()

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.mutable
2222
import org.apache.spark.annotation.DeveloperApi
2323
import org.apache.spark.ui._
2424
import org.apache.spark.scheduler._
25-
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
25+
import org.apache.spark.storage._
2626

2727
/** Web UI showing storage status of all RDD's in the given SparkContext. */
2828
private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
@@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
4040
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
4141
*/
4242
@DeveloperApi
43-
class StorageListener(storageStatusListener: StorageStatusListener)
44-
extends SparkListener {
45-
43+
class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
4644
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()
4745

4846
def storageStatusList = storageStatusListener.storageStatusList
@@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener)
5149
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
5250

5351
/** Update each RDD's info to reflect any updates to the RDD's storage status */
54-
private def updateRDDInfo() {
52+
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
5553
val rddInfos = _rddInfoMap.values.toSeq
56-
val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
54+
val updatedRddInfos =
55+
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
5756
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
5857
}
5958

@@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener)
6463
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
6564
val metrics = taskEnd.taskMetrics
6665
if (metrics != null && metrics.updatedBlocks.isDefined) {
67-
updateRDDInfo()
66+
updateRDDInfo(metrics.updatedBlocks.get)
6867
}
6968
}
7069

@@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener)
7978
}
8079

8180
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
82-
updateRDDInfo()
81+
_rddInfoMap.remove(unpersistRDD.rddId)
8382
}
8483
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.storage
19+
20+
import org.scalatest.FunSuite
21+
import org.apache.spark.Success
22+
import org.apache.spark.executor.TaskMetrics
23+
import org.apache.spark.scheduler._
24+
25+
/**
26+
* Test the behavior of StorageStatusListener in response to all relevant events.
27+
*/
28+
class StorageStatusListenerSuite extends FunSuite {
29+
private val bm1 = BlockManagerId("big", "dog", 1, 1)
30+
private val bm2 = BlockManagerId("fat", "duck", 2, 2)
31+
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
32+
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)
33+
34+
test("block manager added/removed") {
35+
val listener = new StorageStatusListener
36+
37+
// Block manager add
38+
assert(listener.executorIdToStorageStatus.size === 0)
39+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
40+
assert(listener.executorIdToStorageStatus.size === 1)
41+
assert(listener.executorIdToStorageStatus.get("big").isDefined)
42+
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
43+
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
44+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
45+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
46+
assert(listener.executorIdToStorageStatus.size === 2)
47+
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
48+
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
49+
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
50+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
51+
52+
// Block manager remove
53+
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
54+
assert(listener.executorIdToStorageStatus.size === 1)
55+
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
56+
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
57+
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
58+
assert(listener.executorIdToStorageStatus.size === 0)
59+
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
60+
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
61+
}
62+
63+
test("task end without updated blocks") {
64+
val listener = new StorageStatusListener
65+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
66+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
67+
val taskMetrics = new TaskMetrics
68+
69+
// Task end with no updated blocks
70+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
71+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
72+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
73+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
74+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
75+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
76+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
77+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
78+
}
79+
80+
test("task end with updated blocks") {
81+
val listener = new StorageStatusListener
82+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
83+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
84+
val taskMetrics1 = new TaskMetrics
85+
val taskMetrics2 = new TaskMetrics
86+
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
87+
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
88+
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
89+
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
90+
taskMetrics2.updatedBlocks = Some(Seq(block3))
91+
92+
// Task end with new blocks
93+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
94+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
95+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
96+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
97+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
98+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
99+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
100+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
101+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
102+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
103+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
104+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
105+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
106+
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
107+
108+
// Task end with dropped blocks
109+
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
110+
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
111+
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
112+
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
113+
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
114+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
115+
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
116+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
117+
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
118+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
119+
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
120+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
121+
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
122+
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
123+
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
124+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
125+
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
126+
}
127+
128+
test("unpersist RDD") {
129+
val listener = new StorageStatusListener
130+
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
131+
val taskMetrics1 = new TaskMetrics
132+
val taskMetrics2 = new TaskMetrics
133+
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
134+
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
135+
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
136+
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
137+
taskMetrics2.updatedBlocks = Some(Seq(block3))
138+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
139+
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
140+
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
141+
142+
// Unpersist RDD
143+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
144+
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
145+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
146+
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
147+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
148+
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
149+
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
150+
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
151+
}
152+
}

0 commit comments

Comments
 (0)