Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
53af15d
Refactor StorageStatus + add a bunch of tests
andrewor14 Jul 30, 2014
41fa50d
Add a legacy constructor for StorageStatus
andrewor14 Jul 31, 2014
7b2c4aa
Rewrite blockLocationsFromStorageStatus + clean up method signatures
andrewor14 Jul 31, 2014
8e91921
Iterate through a filtered set of blocks when updating RDDInfo
andrewor14 Jul 31, 2014
da8e322
Merge branch 'master' of github.com:apache/spark into fix-drop-events
andrewor14 Jul 31, 2014
b12fcd7
Fix tests + simplify sc.getRDDStorageInfo
andrewor14 Jul 31, 2014
a9ec384
Merge branch 'master' of github.com:apache/spark into fix-drop-events
andrewor14 Jul 31, 2014
6a7b7c0
Avoid chained operations on TraversableLike
andrewor14 Jul 31, 2014
b66b6b0
Use more efficient underlying data structures for blocks
andrewor14 Aug 1, 2014
6fef86a
Add extensive tests for new code in StorageStatus
andrewor14 Aug 1, 2014
2c3ef6a
Actually filter out only the relevant RDDs
andrewor14 Aug 1, 2014
e080b9e
Reduce run time of StorageUtils.updateRddInfo to near constant
andrewor14 Aug 1, 2014
6970bc8
Add extensive tests for StorageListener and the new code in StorageUtils
andrewor14 Aug 1, 2014
af19bc0
*UsedByRDD -> *UsedByRdd (minor)
andrewor14 Aug 1, 2014
8981de1
Merge branch 'master' of github.com:apache/spark into fix-drop-events
andrewor14 Aug 1, 2014
bf6f09b
Minor changes
andrewor14 Aug 2, 2014
a91be46
Make ExecutorsPage blazingly fast
andrewor14 Aug 2, 2014
14fa1c3
Simplify some code + update a few comments
andrewor14 Aug 2, 2014
e132d69
Merge branch 'master' of github.com:apache/spark into fix-drop-events
andrewor14 Aug 2, 2014
f80c1fa
Rewrite fold and reduceOption as sum
andrewor14 Aug 2, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkDeploySchedulerBackend, SimrSchedulerBackend}
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}

Expand Down Expand Up @@ -843,7 +843,9 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

private def storageStatus: Array[StorageStatus] = {
blockManagerInfo.map { case(blockManagerId, info) =>
val blockMap = mutable.Map[BlockId, BlockStatus](info.blocks.toSeq: _*)
new StorageStatus(blockManagerId, info.maxMem, blockMap)
blockManagerInfo.map { case (blockManagerId, info) =>
new StorageStatus(blockManagerId, info.maxMem, info.blocks)
}.toArray
}

Expand Down Expand Up @@ -424,7 +423,14 @@ case class BlockStatus(
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long)
tachyonSize: Long) {
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
}

@DeveloperApi
object BlockStatus {
def empty: BlockStatus = BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
}

private[spark] class BlockManagerInfo(
val blockManagerId: BlockManagerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,32 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
val maxMem = storageStatusList.map(_.maxMem).sum
maxMem / 1024 / 1024
}
})

metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
val remainingMem = storageStatusList.map(_.memRemaining).sum
remainingMem / 1024 / 1024
}
})

metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
val maxMem = storageStatusList.map(_.maxMem).sum
val remainingMem = storageStatusList.map(_.memRemaining).sum
(maxMem - remainingMem) / 1024 / 1024
}
})

metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_ + _)
.getOrElse(0L)

val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
diskSpaceUsed / 1024 / 1024
}
})
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class RDDInfo(
var diskSize = 0L
var tachyonSize = 0L

def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ class StorageStatusListener extends SparkListener {

/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
val filteredStatus = executorIdToStorageStatus.get(execId)
filteredStatus.foreach { storageStatus =>
executorIdToStorageStatus.get(execId).foreach { storageStatus =>
updatedBlocks.foreach { case (blockId, updatedStatus) =>
if (updatedStatus.storageLevel == StorageLevel.NONE) {
storageStatus.blocks.remove(blockId)
storageStatus.removeBlock(blockId)
} else {
storageStatus.blocks(blockId) = updatedStatus
storageStatus.updateBlock(blockId, updatedStatus)
}
}
}
Expand All @@ -50,9 +49,8 @@ class StorageStatusListener extends SparkListener {
/** Update storage status list to reflect the removal of an RDD from the cache */
private def updateStorageStatus(unpersistedRDDId: Int) {
storageStatusList.foreach { storageStatus =>
val unpersistedBlocksIds = storageStatus.rddBlocks.keys.filter(_.rddId == unpersistedRDDId)
unpersistedBlocksIds.foreach { blockId =>
storageStatus.blocks.remove(blockId)
storageStatus.rddBlocksById(unpersistedRDDId).foreach { case (blockId, _) =>
storageStatus.removeBlock(blockId)
}
}
}
Expand Down
Loading