Skip to content

Commit d934801

Browse files
andrewor14pwendell
authored andcommitted
[SPARK-2316] Avoid O(blocks) operations in listeners
The existing code in `StorageUtils` is not the most efficient. Every time we want to update an `RDDInfo` we end up iterating through all blocks on all block managers just to discard most of them. The symptoms manifest themselves in the bountiful UI bugs observed in the wild. Many of these bugs are caused by the slow consumption of events in `LiveListenerBus`, which frequently leads to the event queue overflowing and `SparkListenerEvent`s being dropped on the floor. The changes made in this PR avoid this by first filtering out only the blocks relevant to us before computing storage information from them. It's worth a mention that this corner of the Spark code is also not very well-tested at all. The bulk of the changes in this PR (more than 60%) is actually test cases for the various logic in `StorageUtils.scala` as well as `StorageTab.scala`. These will eventually be extended to cover the various listeners that constitute the `SparkUI`. Author: Andrew Or <[email protected]> Closes #1679 from andrewor14/fix-drop-events and squashes the following commits: f80c1fa [Andrew Or] Rewrite fold and reduceOption as sum e132d69 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events 14fa1c3 [Andrew Or] Simplify some code + update a few comments a91be46 [Andrew Or] Make ExecutorsPage blazingly fast bf6f09b [Andrew Or] Minor changes 8981de1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events af19bc0 [Andrew Or] *UsedByRDD -> *UsedByRdd (minor) 6970bc8 [Andrew Or] Add extensive tests for StorageListener and the new code in StorageUtils e080b9e [Andrew Or] Reduce run time of StorageUtils.updateRddInfo to near constant 2c3ef6a [Andrew Or] Actually filter out only the relevant RDDs 6fef86a [Andrew Or] Add extensive tests for new code in StorageStatus b66b6b0 [Andrew Or] Use more efficient underlying data structures for blocks 6a7b7c0 [Andrew Or] Avoid chained operations on TraversableLike a9ec384 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events b12fcd7 [Andrew Or] Fix tests + simplify sc.getRDDStorageInfo da8e322 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-drop-events 8e91921 [Andrew Or] Iterate through a filtered set of blocks when updating RDDInfo 7b2c4aa [Andrew Or] Rewrite blockLocationsFromStorageStatus + clean up method signatures 41fa50d [Andrew Or] Add a legacy constructor for StorageStatus 53af15d [Andrew Or] Refactor StorageStatus + add a bunch of tests
1 parent dab3796 commit d934801

File tree

13 files changed

+843
-176
lines changed

13 files changed

+843
-176
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ import org.apache.spark.scheduler._
4848
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
4949
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
5050
import org.apache.spark.scheduler.local.LocalBackend
51-
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
51+
import org.apache.spark.storage._
5252
import org.apache.spark.ui.SparkUI
5353
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5454

@@ -843,7 +843,9 @@ class SparkContext(config: SparkConf) extends Logging {
843843
*/
844844
@DeveloperApi
845845
def getRDDStorageInfo: Array[RDDInfo] = {
846-
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
846+
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
847+
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
848+
rddInfos.filter(_.isCached)
847849
}
848850

849851
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
267267
}
268268

269269
private def storageStatus: Array[StorageStatus] = {
270-
blockManagerInfo.map { case(blockManagerId, info) =>
271-
val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
272-
new StorageStatus(blockManagerId, info.maxMem, blockMap)
270+
blockManagerInfo.map { case (blockManagerId, info) =>
271+
new StorageStatus(blockManagerId, info.maxMem, info.blocks)
273272
}.toArray
274273
}
275274

@@ -424,7 +423,14 @@ case class BlockStatus(
424423
storageLevel: StorageLevel,
425424
memSize: Long,
426425
diskSize: Long,
427-
tachyonSize: Long)
426+
tachyonSize: Long) {
427+
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
428+
}
429+
430+
@DeveloperApi
431+
object BlockStatus {
432+
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
433+
}
428434

429435
private[spark] class BlockManagerInfo(
430436
val blockManagerId: BlockManagerId,

core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,36 +30,32 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
3030
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
3131
override def getValue: Long = {
3232
val storageStatusList = blockManager.master.getStorageStatus
33-
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
33+
val maxMem = storageStatusList.map(_.maxMem).sum
3434
maxMem / 1024 / 1024
3535
}
3636
})
3737

3838
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
3939
override def getValue: Long = {
4040
val storageStatusList = blockManager.master.getStorageStatus
41-
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
41+
val remainingMem = storageStatusList.map(_.memRemaining).sum
4242
remainingMem / 1024 / 1024
4343
}
4444
})
4545

4646
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
4747
override def getValue: Long = {
4848
val storageStatusList = blockManager.master.getStorageStatus
49-
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
50-
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
49+
val maxMem = storageStatusList.map(_.maxMem).sum
50+
val remainingMem = storageStatusList.map(_.memRemaining).sum
5151
(maxMem - remainingMem) / 1024 / 1024
5252
}
5353
})
5454

5555
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
5656
override def getValue: Long = {
5757
val storageStatusList = blockManager.master.getStorageStatus
58-
val diskSpaceUsed = storageStatusList
59-
.flatMap(_.blocks.values.map(_.diskSize))
60-
.reduceOption(_ + _)
61-
.getOrElse(0L)
62-
58+
val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
6359
diskSpaceUsed / 1024 / 1024
6460
}
6561
})

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class RDDInfo(
3434
var diskSize = 0L
3535
var tachyonSize = 0L
3636

37+
def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
38+
3739
override def toString = {
3840
import Utils.bytesToString
3941
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +

core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,12 @@ class StorageStatusListener extends SparkListener {
3535

3636
/** Update storage status list to reflect updated block statuses */
3737
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
38-
val filteredStatus = executorIdToStorageStatus.get(execId)
39-
filteredStatus.foreach { storageStatus =>
38+
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
4039
updatedBlocks.foreach { case (blockId, updatedStatus) =>
4140
if (updatedStatus.storageLevel == StorageLevel.NONE) {
42-
storageStatus.blocks.remove(blockId)
41+
storageStatus.removeBlock(blockId)
4342
} else {
44-
storageStatus.blocks(blockId) = updatedStatus
43+
storageStatus.updateBlock(blockId, updatedStatus)
4544
}
4645
}
4746
}
@@ -50,9 +49,8 @@ class StorageStatusListener extends SparkListener {
5049
/** Update storage status list to reflect the removal of an RDD from the cache */
5150
private def updateStorageStatus(unpersistedRDDId: Int) {
5251
storageStatusList.foreach { storageStatus =>
53-
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
54-
unpersistedBlocksIds.foreach { blockId =>
55-
storageStatus.blocks.remove(blockId)
52+
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
53+
storageStatus.removeBlock(blockId)
5654
}
5755
}
5856
}

0 commit comments

Comments
 (0)