Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,31 @@ import org.apache.spark.scheduler._
*/
@DeveloperApi
class StorageStatusListener extends SparkListener {
private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()

def storageStatusList = executorIdToStorageStatus.values.toSeq

/** Update storage status list to reflect updated block statuses */
def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId)
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = executorIdToStorageStatus.get(execId)
filteredStatus.foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
storageStatus.blocks(blockId) = updatedStatus
if (updatedStatus.storageLevel == StorageLevel.NONE) {
storageStatus.blocks.remove(blockId)
} else {
storageStatus.blocks(blockId) = updatedStatus
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you update the docs above executorIdToStorageStatus (above) to say that it only stores persisted blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

}

/** Update storage status list to reflect the removal of an RDD from the cache */
def updateStorageStatus(unpersistedRDDId: Int) {
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
storageStatus.blocks.remove(blockId)
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,26 @@ private[spark] object StorageUtils {
/** Returns storage information of all RDDs in the given list. */
def rddInfoFromStorageStatus(
storageStatuses: Seq[StorageStatus],
rddInfos: Seq[RDDInfo]): Array[RDDInfo] = {
rddInfos: Seq[RDDInfo],
updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = {

// Mapping from a block ID -> its status
val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*)

// Record updated blocks, if any
updatedBlocks
.collect { case (id: RDDBlockId, status) => (id, status) }
.foreach { case (id, status) => blockMap(id) = status }

// Mapping from RDD ID -> an array of associated BlockStatuses
val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap
val rddBlockMap = blockMap
.groupBy { case (k, _) => k.rddId }
.mapValues(_.values.toArray)

// Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information)
val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap

val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) =>
val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) =>
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.count { case (_, blockStatus) =>
blockStatus.storageLevel != StorageLevel.NONE
}
val rddBlocks = status.blocks.size
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener {
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.ui._
import org.apache.spark.scheduler._
import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
import org.apache.spark.storage._

/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") {
Expand All @@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage"
* A SparkListener that prepares information to be displayed on the BlockManagerUI.
*/
@DeveloperApi
class StorageListener(storageStatusListener: StorageStatusListener)
extends SparkListener {

class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener {
private val _rddInfoMap = mutable.Map[Int, RDDInfo]()

def storageStatusList = storageStatusListener.storageStatusList
Expand All @@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener)
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq

/** Update each RDD's info to reflect any updates to the RDD's storage status */
private def updateRDDInfo() {
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) {
val rddInfos = _rddInfoMap.values.toSeq
val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos)
val updatedRddInfos =
StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks)
updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info }
}

Expand All @@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener)
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val metrics = taskEnd.taskMetrics
if (metrics != null && metrics.updatedBlocks.isDefined) {
updateRDDInfo()
updateRDDInfo(metrics.updatedBlocks.get)
}
}

Expand All @@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener)
}

override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
updateRDDInfo()
_rddInfoMap.remove(unpersistRDD.rddId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.scalatest.FunSuite
import org.apache.spark.Success
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._

/**
* Test the behavior of StorageStatusListener in response to all relevant events.
*/
class StorageStatusListenerSuite extends FunSuite {
private val bm1 = BlockManagerId("big", "dog", 1, 1)
private val bm2 = BlockManagerId("fat", "duck", 2, 2)
private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false)
private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false)

test("block manager added/removed") {
val listener = new StorageStatusListener

// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
assert(listener.executorIdToStorageStatus.size === 1)
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)

// Block manager remove
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
}

test("task end without updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
val taskMetrics = new TaskMetrics

// Task end with no updated blocks
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
}

test("task end with updated blocks") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))

// Task end with new blocks
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))

// Task end with dropped blocks
val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 1)
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0)))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 1)
assert(listener.executorIdToStorageStatus("fat").blocks.size === 0)
assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty)
}

test("unpersist RDD") {
val listener = new StorageStatusListener
listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L))
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)

// Unpersist RDD
listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090))
assert(listener.executorIdToStorageStatus("big").blocks.size === 3)
listener.onUnpersistRDD(SparkListenerUnpersistRDD(4))
assert(listener.executorIdToStorageStatus("big").blocks.size === 2)
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2)))
listener.onUnpersistRDD(SparkListenerUnpersistRDD(1))
assert(listener.executorIdToStorageStatus("big").blocks.isEmpty)
}
}