From 3afde3fff4e07296b7fc9ddab50a76e3a321bf53 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 27 Jun 2014 23:01:25 -0700 Subject: [PATCH 1/3] Correctly report the number of blocks on SparkUI This is actually quite tricky to get right. With this commit, StorageStatusListener will only hold cached blocks (i.e. no blocks with StorageLevel.NONE). This means the StorageTab needs special handling, because it currently relies on dropped blocks having StorageLevel.NONE, rather than disappearing altogether in the storage status list. --- .../spark/storage/StorageStatusListener.scala | 8 ++++++-- .../org/apache/spark/storage/StorageUtils.scala | 15 ++++++++++++--- .../org/apache/spark/ui/exec/ExecutorsPage.scala | 4 +--- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 4 +--- .../org/apache/spark/ui/storage/StorageTab.scala | 15 +++++++-------- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index a6e6627d54e0..628cc07a4042 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -37,7 +37,11 @@ class StorageStatusListener extends SparkListener { val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) filteredStatus.foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => - storageStatus.blocks(blockId) = updatedStatus + if (updatedStatus.storageLevel == StorageLevel.NONE) { + storageStatus.blocks.remove(blockId) + } else { + storageStatus.blocks(blockId) = updatedStatus + } } } } @@ -47,7 +51,7 @@ class StorageStatusListener extends SparkListener { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => - storageStatus.blocks(blockId) = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L) + storageStatus.blocks.remove(blockId) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index f3bde1df45c7..177281f66336 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -75,17 +75,26 @@ private[spark] object StorageUtils { /** Returns storage information of all RDDs in the given list. */ def rddInfoFromStorageStatus( storageStatuses: Seq[StorageStatus], - rddInfos: Seq[RDDInfo]): Array[RDDInfo] = { + rddInfos: Seq[RDDInfo], + updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty): Array[RDDInfo] = { + + // Mapping from a block ID -> its status + val blockMap = mutable.Map(storageStatuses.flatMap(_.rddBlocks): _*) + + // Record updated blocks, if any + updatedBlocks + .collect { case (id: RDDBlockId, status) => (id, status) } + .foreach { case (id, status) => blockMap(id) = status } // Mapping from RDD ID -> an array of associated BlockStatuses - val blockStatusMap = storageStatuses.flatMap(_.rddBlocks).toMap + val rddBlockMap = blockMap .groupBy { case (k, _) => k.rddId } .mapValues(_.values.toArray) // Mapping from RDD ID -> the associated RDDInfo (with potentially outdated storage information) val rddInfoMap = rddInfos.map { info => (info.id, info) }.toMap - val rddStorageInfos = blockStatusMap.flatMap { case (rddId, blocks) => + val rddStorageInfos = rddBlockMap.flatMap { case (rddId, blocks) => // Add up memory, disk and Tachyon sizes val persistedBlocks = blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 6cfc46c7e78f..9b490b7427f3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -108,9 +108,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort - val rddBlocks = status.blocks.count { case (_, blockStatus) => - blockStatus.storageLevel != StorageLevel.NONE - } + val rddBlocks = status.blocks.size val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 91d37b835b19..38eda4121157 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -39,9 +39,7 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "execut * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index c4bb7aab5039..0cc0cf311717 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.apache.spark.annotation.DeveloperApi import org.apache.spark.ui._ import org.apache.spark.scheduler._ -import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} +import org.apache.spark.storage._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") { @@ -40,9 +40,7 @@ private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage" * A SparkListener that prepares information to be displayed on the BlockManagerUI. */ @DeveloperApi -class StorageListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - +class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { private val _rddInfoMap = mutable.Map[Int, RDDInfo]() def storageStatusList = storageStatusListener.storageStatusList @@ -51,9 +49,10 @@ class StorageListener(storageStatusListener: StorageStatusListener) def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq /** Update each RDD's info to reflect any updates to the RDD's storage status */ - private def updateRDDInfo() { + private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)] = Seq.empty) { val rddInfos = _rddInfoMap.values.toSeq - val updatedRddInfos = StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos) + val updatedRddInfos = + StorageUtils.rddInfoFromStorageStatus(storageStatusList, rddInfos, updatedBlocks) updatedRddInfos.foreach { info => _rddInfoMap(info.id) = info } } @@ -64,7 +63,7 @@ class StorageListener(storageStatusListener: StorageStatusListener) override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { val metrics = taskEnd.taskMetrics if (metrics != null && metrics.updatedBlocks.isDefined) { - updateRDDInfo() + updateRDDInfo(metrics.updatedBlocks.get) } } @@ -79,6 +78,6 @@ class StorageListener(storageStatusListener: StorageStatusListener) } override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized { - updateRDDInfo() + _rddInfoMap.remove(unpersistRDD.rddId) } } From 8773b0133322bb51a3b591d96827e0ee831f4c2e Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 28 Jun 2014 13:17:22 -0700 Subject: [PATCH 2/3] Update comment / minor changes --- .../org/apache/spark/storage/StorageStatusListener.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 628cc07a4042..84fa22911eb4 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -28,13 +28,14 @@ import org.apache.spark.scheduler._ */ @DeveloperApi class StorageStatusListener extends SparkListener { + // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() def storageStatusList = executorIdToStorageStatus.values.toSeq /** Update storage status list to reflect updated block statuses */ - def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { - val filteredStatus = storageStatusList.find(_.blockManagerId.executorId == execId) + private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) { + val filteredStatus = executorIdToStorageStatus.get(execId) filteredStatus.foreach { storageStatus => updatedBlocks.foreach { case (blockId, updatedStatus) => if (updatedStatus.storageLevel == StorageLevel.NONE) { @@ -47,7 +48,7 @@ class StorageStatusListener extends SparkListener { } /** Update storage status list to reflect the removal of an RDD from the cache */ - def updateStorageStatus(unpersistedRDDId: Int) { + private def updateStorageStatus(unpersistedRDDId: Int) { storageStatusList.foreach { storageStatus => val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId) unpersistedBlocksIds.foreach { blockId => From a82ea25e57217be3b58c78508452917acefbb1dc Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Sat, 28 Jun 2014 14:02:18 -0700 Subject: [PATCH 3/3] Add tests for StorageStatusListener --- .../spark/storage/StorageStatusListener.scala | 2 +- .../storage/StorageStatusListenerSuite.scala | 152 ++++++++++++++++++ 2 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index 84fa22911eb4..41c960c867e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -29,7 +29,7 @@ import org.apache.spark.scheduler._ @DeveloperApi class StorageStatusListener extends SparkListener { // This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE) - private val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() + private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]() def storageStatusList = executorIdToStorageStatus.values.toSeq diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala new file mode 100644 index 000000000000..2179c6dd3302 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import org.scalatest.FunSuite +import org.apache.spark.Success +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler._ + +/** + * Test the behavior of StorageStatusListener in response to all relevant events. + */ +class StorageStatusListenerSuite extends FunSuite { + private val bm1 = BlockManagerId("big", "dog", 1, 1) + private val bm2 = BlockManagerId("fat", "duck", 2, 2) + private val taskInfo1 = new TaskInfo(0, 0, 0, 0, "big", "dog", TaskLocality.ANY, false) + private val taskInfo2 = new TaskInfo(0, 0, 0, 0, "fat", "duck", TaskLocality.ANY, false) + + test("block manager added/removed") { + val listener = new StorageStatusListener + + // Block manager add + assert(listener.executorIdToStorageStatus.size === 0) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) + assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + assert(listener.executorIdToStorageStatus.size === 2) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) + assert(listener.executorIdToStorageStatus("fat").maxMem === 2000L) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + + // Block manager remove + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + assert(listener.executorIdToStorageStatus.size === 1) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(listener.executorIdToStorageStatus.get("fat").isDefined) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + assert(listener.executorIdToStorageStatus.size === 0) + assert(!listener.executorIdToStorageStatus.get("big").isDefined) + assert(!listener.executorIdToStorageStatus.get("fat").isDefined) + } + + test("task end without updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics = new TaskMetrics + + // Task end with no updated blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("task end with updated blocks") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + + // Task end with new blocks + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + + // Task end with dropped blocks + val droppedBlock1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3)) + taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 1) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.contains(RDDBlockId(4, 0))) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 1) + assert(listener.executorIdToStorageStatus("fat").blocks.size === 0) + assert(!listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + assert(listener.executorIdToStorageStatus("fat").blocks.isEmpty) + } + + test("unpersist RDD") { + val listener = new StorageStatusListener + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + val taskMetrics1 = new TaskMetrics + val taskMetrics2 = new TaskMetrics + val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) + val block2 = (RDDBlockId(1, 2), BlockStatus(StorageLevel.DISK_ONLY, 0L, 200L, 0L)) + val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L)) + taskMetrics1.updatedBlocks = Some(Seq(block1, block2)) + taskMetrics2.updatedBlocks = Some(Seq(block3)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1)) + listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + + // Unpersist RDD + listener.onUnpersistRDD(SparkListenerUnpersistRDD(9090)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 3) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(4)) + assert(listener.executorIdToStorageStatus("big").blocks.size === 2) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 1))) + assert(listener.executorIdToStorageStatus("big").blocks.contains(RDDBlockId(1, 2))) + listener.onUnpersistRDD(SparkListenerUnpersistRDD(1)) + assert(listener.executorIdToStorageStatus("big").blocks.isEmpty) + } +}