Skip to content

Commit d2f21b0

Browse files
Marcelo Vanzinsquito
authored andcommitted
[SPARK-27468][CORE] Track correct storage level of RDDs and partitions
Previously, the RDD level would change depending on the status reported by executors for the block they were storing, and individual blocks would reflect that. That is wrong because different blocks may be stored differently in different executors. So now the RDD tracks the user-provided storage level, while the individual partitions reflect the current storage level of that particular block, including the current number of replicas. Closes #25779 from vanzin/SPARK-27468. Authored-by: Marcelo Vanzin <[email protected]> Signed-off-by: Imran Rashid <[email protected]>
1 parent 64fe82b commit d2f21b0

File tree

4 files changed

+68
-34
lines changed

4 files changed

+68
-34
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ private[spark] class AppStatusListener(
234234
(partition.memoryUsed / partition.executors.length) * -1)
235235
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
236236
(partition.diskUsed / partition.executors.length) * -1)
237-
partition.update(partition.executors
238-
.filter(!_.equals(event.executorId)), rdd.storageLevel,
237+
partition.update(
238+
partition.executors.filter(!_.equals(event.executorId)),
239239
addDeltaToValue(partition.memoryUsed,
240240
(partition.memoryUsed / partition.executors.length) * -1),
241241
addDeltaToValue(partition.diskUsed,
@@ -495,7 +495,7 @@ private[spark] class AppStatusListener(
495495

496496
event.stageInfo.rddInfos.foreach { info =>
497497
if (info.storageLevel.isValid) {
498-
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
498+
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info, info.storageLevel)), now)
499499
}
500500
}
501501

@@ -916,12 +916,6 @@ private[spark] class AppStatusListener(
916916
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
917917
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
918918

919-
val updatedStorageLevel = if (storageLevel.isValid) {
920-
Some(storageLevel.description)
921-
} else {
922-
None
923-
}
924-
925919
// We need information about the executor to update some memory accounting values in the
926920
// RDD info, so read that beforehand.
927921
val maybeExec = liveExecutors.get(executorId)
@@ -936,13 +930,9 @@ private[spark] class AppStatusListener(
936930
// Update the block entry in the RDD info, keeping track of the deltas above so that we
937931
// can update the executor information too.
938932
liveRDDs.get(block.rddId).foreach { rdd =>
939-
if (updatedStorageLevel.isDefined) {
940-
rdd.setStorageLevel(updatedStorageLevel.get)
941-
}
942-
943933
val partition = rdd.partition(block.name)
944934

945-
val executors = if (updatedStorageLevel.isDefined) {
935+
val executors = if (storageLevel.isValid) {
946936
val current = partition.executors
947937
if (current.contains(executorId)) {
948938
current
@@ -957,7 +947,7 @@ private[spark] class AppStatusListener(
957947

958948
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
959949
if (executors.nonEmpty) {
960-
partition.update(executors, rdd.storageLevel,
950+
partition.update(executors,
961951
addDeltaToValue(partition.memoryUsed, memoryDelta),
962952
addDeltaToValue(partition.diskUsed, diskDelta))
963953
} else {

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3030
import org.apache.spark.resource.ResourceInformation
3131
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
3232
import org.apache.spark.status.api.v1
33-
import org.apache.spark.storage.RDDInfo
33+
import org.apache.spark.storage.{RDDInfo, StorageLevel}
3434
import org.apache.spark.ui.SparkUI
3535
import org.apache.spark.util.AccumulatorContext
3636
import org.apache.spark.util.collection.OpenHashSet
@@ -458,7 +458,13 @@ private class LiveStage extends LiveEntity {
458458

459459
}
460460

461-
private class LiveRDDPartition(val blockName: String) {
461+
/**
462+
* Data about a single partition of a cached RDD. The RDD storage level is used to compute the
463+
* effective storage level of the partition, which takes into account the storage actually being
464+
* used by the partition in the executors, and thus may differ from the storage level requested
465+
* by the application.
466+
*/
467+
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
462468

463469
import LiveEntityHelpers._
464470

@@ -476,12 +482,13 @@ private class LiveRDDPartition(val blockName: String) {
476482

477483
def update(
478484
executors: Seq[String],
479-
storageLevel: String,
480485
memoryUsed: Long,
481486
diskUsed: Long): Unit = {
487+
val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap,
488+
if (memoryUsed > 0) rddLevel.deserialized else false, executors.size)
482489
value = new v1.RDDPartitionInfo(
483490
blockName,
484-
weakIntern(storageLevel),
491+
weakIntern(level.description),
485492
memoryUsed,
486493
diskUsed,
487494
executors)
@@ -520,27 +527,31 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
520527

521528
}
522529

523-
private class LiveRDD(val info: RDDInfo) extends LiveEntity {
530+
/**
531+
* Tracker for data related to a persisted RDD.
532+
*
533+
* The RDD storage level is immutable, following the current behavior of `RDD.persist()`, even
534+
* though it is mutable in the `RDDInfo` structure. Since the listener does not track unpersisted
535+
* RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage
536+
* it started after the RDD is marked for caching.
537+
*/
538+
private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {
524539

525540
import LiveEntityHelpers._
526541

527-
var storageLevel: String = weakIntern(info.storageLevel.description)
528542
var memoryUsed = 0L
529543
var diskUsed = 0L
530544

545+
private val levelDescription = weakIntern(storageLevel.description)
531546
private val partitions = new HashMap[String, LiveRDDPartition]()
532547
private val partitionSeq = new RDDPartitionSeq()
533548

534549
private val distributions = new HashMap[String, LiveRDDDistribution]()
535550

536-
def setStorageLevel(level: String): Unit = {
537-
this.storageLevel = weakIntern(level)
538-
}
539-
540551
def partition(blockName: String): LiveRDDPartition = {
541552
partitions.getOrElseUpdate(blockName, {
542-
val part = new LiveRDDPartition(blockName)
543-
part.update(Nil, storageLevel, 0L, 0L)
553+
val part = new LiveRDDPartition(blockName, storageLevel)
554+
part.update(Nil, 0L, 0L)
544555
partitionSeq.addPartition(part)
545556
part
546557
})
@@ -578,7 +589,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
578589
info.name,
579590
info.numPartitions,
580591
partitions.size,
581-
storageLevel,
592+
levelDescription,
582593
memoryUsed,
583594
diskUsed,
584595
dists,

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
4242
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
4343
.set(ASYNC_TRACKING_ENABLED, false)
4444

45+
private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)
46+
4547
private var time: Long = _
4648
private var testDir: File = _
4749
private var store: ElementTrackingStore = _
@@ -697,8 +699,16 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
697699
val rdd2b1 = RddBlock(2, 1, 5L, 6L)
698700
val level = StorageLevel.MEMORY_AND_DISK
699701

702+
// Submit a stage for the first RDD before it's marked for caching, to make sure later
703+
// the listener picks up the correct storage level.
704+
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, StorageLevel.NONE, false, Nil)
705+
val stage0 = new StageInfo(0, 0, "stage0", 4, Seq(rdd1Info), Nil, "details0")
706+
listener.onStageSubmitted(SparkListenerStageSubmitted(stage0, new Properties()))
707+
listener.onStageCompleted(SparkListenerStageCompleted(stage0))
708+
assert(store.count(classOf[RDDStorageInfoWrapper]) === 0)
709+
700710
// Submit a stage and make sure the RDDs are recorded.
701-
val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil)
711+
rdd1Info.storageLevel = level
702712
val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil)
703713
val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1")
704714
listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
@@ -763,6 +773,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
763773
assert(part.memoryUsed === rdd1b1.memSize * 2)
764774
assert(part.diskUsed === rdd1b1.diskSize * 2)
765775
assert(part.executors === Seq(bm1.executorId, bm2.executorId))
776+
assert(part.storageLevel === twoReplicaMemAndDiskLevel.description)
766777
}
767778

768779
check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
@@ -800,9 +811,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
800811
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
801812
}
802813

803-
// Remove block 1 from bm 1.
814+
// Evict block 1 from memory in bm 1. Note that because of SPARK-29319, the disk size
815+
// is reported as "0" here to avoid double-counting; the current behavior of the block
816+
// manager is to provide the actual disk size of the block.
817+
listener.onBlockUpdated(SparkListenerBlockUpdated(
818+
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.DISK_ONLY,
819+
rdd1b1.memSize, 0L)))
820+
821+
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
822+
assert(wrapper.info.numCachedPartitions === 2L)
823+
assert(wrapper.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
824+
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
825+
assert(wrapper.info.dataDistribution.get.size === 2L)
826+
assert(wrapper.info.partitions.get.size === 2L)
827+
}
828+
829+
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
830+
assert(exec.info.rddBlocks === 2L)
831+
assert(exec.info.memoryUsed === rdd1b2.memSize)
832+
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
833+
}
834+
835+
// Remove block 1 from bm 1; note memSize = 0 due to the eviction above.
804836
listener.onBlockUpdated(SparkListenerBlockUpdated(
805-
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize)))
837+
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0, rdd1b1.diskSize)))
806838

807839
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
808840
assert(wrapper.info.numCachedPartitions === 2L)
@@ -1571,7 +1603,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
15711603
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
15721604

15731605
val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
1574-
assert(part1.storageLevel === level.description)
1606+
assert(part1.storageLevel === twoReplicaMemAndDiskLevel.description)
15751607
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
15761608
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
15771609
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))

core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.status
1919

2020
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.storage.StorageLevel
2122

2223
class LiveEntitySuite extends SparkFunSuite {
2324

@@ -59,8 +60,8 @@ class LiveEntitySuite extends SparkFunSuite {
5960
}
6061

6162
private def newPartition(i: Int): LiveRDDPartition = {
62-
val part = new LiveRDDPartition(i.toString)
63-
part.update(Seq(i.toString), i.toString, i, i)
63+
val part = new LiveRDDPartition(i.toString, StorageLevel.MEMORY_AND_DISK)
64+
part.update(Seq(i.toString), i, i)
6465
part
6566
}
6667

0 commit comments

Comments
 (0)