Skip to content

Commit 2c3ef6a

Browse files
committed
Actually filter out only the relevant RDDs
Prior to this commit, the changes in the PR actually demonstrate little performance improvement under all workloads. This is because we update all RDDInfos, rather than only the ones whose blocks are being updated. Thus, even though the new filter logic in StorageStatus is correct, we still iterate through all the RDD blocks every time a task has an updated block. This commit avoids this by only calling StorageLevel.updateRDDInfo on the RDDs that need to be updated.
1 parent 6fef86a commit 2c3ef6a

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@ class StorageListener(storageStatusListener: StorageStatusListener) extends Spar
4848
/** Filter RDD info to include only those with cached partitions */
4949
def rddInfoList = _rddInfoMap.values.filter(_.numCachedPartitions > 0).toSeq
5050

51-
/** Update each RDD's info to reflect any updates in the RDD's storage status */
51+
/** Update the storage info of the RDDs whose blocks are among the given updated blocks */
5252
private def updateRDDInfo(updatedBlocks: Seq[(BlockId, BlockStatus)]): Unit = {
53-
StorageUtils.updateRddInfo(_rddInfoMap.values.toSeq, storageStatusList, updatedBlocks)
53+
val rddIdsToUpdate = updatedBlocks.flatMap { case (bid, _) => bid.asRDDId.map(_.rddId) }.toSet
54+
val rddInfosToUpdate = _rddInfoMap.values.toSeq.filter { s => rddIdsToUpdate.contains(s.id) }
55+
StorageUtils.updateRddInfo(rddInfosToUpdate, storageStatusList, updatedBlocks)
5456
}
5557

5658
/**

0 commit comments

Comments
 (0)