Skip to content

Commit f80c1fa

Browse files
committed
Rewrite fold and reduceOption as sum
1 parent e132d69 commit f80c1fa

File tree

4 files changed

+19
-19
lines changed

4 files changed

+19
-19
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,32 +30,32 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
3030
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
3131
override def getValue: Long = {
3232
val storageStatusList = blockManager.master.getStorageStatus
33-
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
33+
val maxMem = storageStatusList.map(_.maxMem).sum
3434
maxMem / 1024 / 1024
3535
}
3636
})
3737

3838
metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MB"), new Gauge[Long] {
3939
override def getValue: Long = {
4040
val storageStatusList = blockManager.master.getStorageStatus
41-
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
41+
val remainingMem = storageStatusList.map(_.memRemaining).sum
4242
remainingMem / 1024 / 1024
4343
}
4444
})
4545

4646
metricRegistry.register(MetricRegistry.name("memory", "memUsed_MB"), new Gauge[Long] {
4747
override def getValue: Long = {
4848
val storageStatusList = blockManager.master.getStorageStatus
49-
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
50-
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
49+
val maxMem = storageStatusList.map(_.maxMem).sum
50+
val remainingMem = storageStatusList.map(_.memRemaining).sum
5151
(maxMem - remainingMem) / 1024 / 1024
5252
}
5353
})
5454

5555
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MB"), new Gauge[Long] {
5656
override def getValue: Long = {
5757
val storageStatusList = blockManager.master.getStorageStatus
58-
val diskSpaceUsed = storageStatusList.map(_.diskUsed).reduceOption(_ + _).getOrElse(0L)
58+
val diskSpaceUsed = storageStatusList.map(_.diskUsed).sum
5959
diskSpaceUsed / 1024 / 1024
6060
}
6161
})

core/src/main/scala/org/apache/spark/storage/StorageUtils.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
159159
* Return the number of RDD blocks stored in this block manager in O(RDDs) time.
160160
* Note that this is much faster than `this.rddBlocks.size`, which is O(RDD blocks) time.
161161
*/
162-
def numRddBlocks: Int = _rddBlocks.values.map(_.size).fold(0)(_ + _)
162+
def numRddBlocks: Int = _rddBlocks.values.map(_.size).sum
163163

164164
/**
165165
* Return the number of blocks that belong to the given RDD in O(1) time.
@@ -173,15 +173,15 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
173173

174174
/** Return the memory used by this block manager. */
175175
def memUsed: Long =
176-
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).fold(0L)(_ + _)
176+
_nonRddStorageInfo._1 + _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
177177

178178
/** Return the disk space used by this block manager. */
179179
def diskUsed: Long =
180-
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).fold(0L)(_ + _)
180+
_nonRddStorageInfo._2 + _rddBlocks.keys.toSeq.map(diskUsedByRdd).sum
181181

182182
/** Return the off-heap space used by this block manager. */
183183
def offHeapUsed: Long =
184-
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).fold(0L)(_ + _)
184+
_nonRddStorageInfo._3 + _rddBlocks.keys.toSeq.map(offHeapUsedByRdd).sum
185185

186186
/** Return the memory used by the given RDD in this block manager in O(1) time. */
187187
def memUsedByRdd(rddId: Int): Long = _rddStorageInfo.get(rddId).map(_._1).getOrElse(0L)
@@ -247,10 +247,10 @@ private[spark] object StorageUtils {
247247
// Assume all blocks belonging to the same RDD have the same storage level
248248
val storageLevel = statuses
249249
.map(_.rddStorageLevel(rddId)).flatMap(s => s).headOption.getOrElse(StorageLevel.NONE)
250-
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).fold(0)(_ + _)
251-
val memSize = statuses.map(_.memUsedByRdd(rddId)).fold(0L)(_ + _)
252-
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).fold(0L)(_ + _)
253-
val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).fold(0L)(_ + _)
250+
val numCachedPartitions = statuses.map(_.numRddBlocksById(rddId)).sum
251+
val memSize = statuses.map(_.memUsedByRdd(rddId)).sum
252+
val diskSize = statuses.map(_.diskUsedByRdd(rddId)).sum
253+
val tachyonSize = statuses.map(_.offHeapUsedByRdd(rddId)).sum
254254

255255
rddInfo.storageLevel = storageLevel
256256
rddInfo.numCachedPartitions = numCachedPartitions

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
4949

5050
def render(request: HttpServletRequest): Seq[Node] = {
5151
val storageStatusList = listener.storageStatusList
52-
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
53-
val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
54-
val diskUsed = storageStatusList.map(_.diskUsed).fold(0L)(_ + _)
52+
val maxMem = storageStatusList.map(_.maxMem).sum
53+
val memUsed = storageStatusList.map(_.memUsed).sum
54+
val diskUsed = storageStatusList.map(_.diskUsed).sum
5555
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
5656
val execInfoSorted = execInfo.sortBy(_.id)
5757

core/src/test/scala/org/apache/spark/storage/StorageSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,9 @@ class StorageSuite extends FunSuite {
242242

243243
test("storage status memUsed, diskUsed, tachyonUsed") {
244244
val status = storageStatus2
245-
def actualMemUsed: Long = status.blocks.values.map(_.memSize).fold(0L)(_ + _)
246-
def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).fold(0L)(_ + _)
247-
def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).fold(0L)(_ + _)
245+
def actualMemUsed: Long = status.blocks.values.map(_.memSize).sum
246+
def actualDiskUsed: Long = status.blocks.values.map(_.diskSize).sum
247+
def actualOffHeapUsed: Long = status.blocks.values.map(_.tachyonSize).sum
248248
assert(status.memUsed === actualMemUsed)
249249
assert(status.diskUsed === actualDiskUsed)
250250
assert(status.offHeapUsed === actualOffHeapUsed)

0 commit comments

Comments
 (0)