Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@
stroke: #52C366;
stroke-width: 2px;
}

.tooltip-inner {
white-space: pre-wrap;
}
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDDOperationScope, RDD}
import org.apache.spark.util.Utils
import org.apache.spark.util.{CallSite, Utils}

@DeveloperApi
class RDDInfo(
Expand All @@ -28,9 +28,20 @@ class RDDInfo(
val numPartitions: Int,
var storageLevel: StorageLevel,
val parentIds: Seq[Int],
val callSite: CallSite,
val scope: Option[RDDOperationScope] = None)
extends Ordered[RDDInfo] {

def this(
id: Int,
name: String,
numPartitions: Int,
storageLevel: StorageLevel,
parentIds: Seq[Int],
scope: Option[RDDOperationScope] = None) {
this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope)
}

var numCachedPartitions = 0
var memSize = 0L
var diskSize = 0L
Expand All @@ -56,6 +67,7 @@ private[spark] object RDDInfo {
def fromRdd(rdd: RDD[_]): RDDInfo = {
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
val parentIds = rdd.dependencies.map(_.rdd.id)
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope)
new RDDInfo(rdd.id, rddName, rdd.partitions.length,
rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer}
import org.apache.spark.Logging
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.CallSite

/**
* A representation of a generic cluster graph used for storing information on RDD operations.
Expand All @@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph(
rootCluster: RDDOperationCluster)

/** A node in an RDDOperationGraph. This represents an RDD. */
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite)

/**
* A directed edge connecting two nodes in an RDDOperationGraph.
Expand Down Expand Up @@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging {
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }

// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
val node = nodes.getOrElseUpdate(
rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))

if (rdd.scope.isEmpty) {
// This RDD has no encompassing scope, so we put it directly in the root cluster
Expand Down Expand Up @@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging {

/** Return the dot representation of a node in an RDDOperationGraph. */
private def makeDotNode(node: RDDOperationNode): String = {
s"""${node.id} [label="${node.name} [${node.id}]"]"""
val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}"
s"""${node.id} [label="$label"]"""
}

/** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ private[spark] object JsonProtocol {
("RDD ID" -> rddInfo.id) ~
("Name" -> rddInfo.name) ~
("Scope" -> rddInfo.scope.map(_.toJson)) ~
("Callsite" -> callsiteToJson(rddInfo.callSite)) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
Expand All @@ -406,6 +407,11 @@ private[spark] object JsonProtocol {
("Disk Size" -> rddInfo.diskSize)
}

def callsiteToJson(callsite: CallSite): JValue = {
("Short Form" -> callsite.shortForm) ~
("Long Form" -> callsite.longForm)
}

def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
Expand Down Expand Up @@ -846,6 +852,9 @@ private[spark] object JsonProtocol {
val scope = Utils.jsonOption(json \ "Scope")
.map(_.extract[String])
.map(RDDOperationScope.fromJson)
val callsite = Utils.jsonOption(json \ "Callsite")
.map(callsiteFromJson)
.getOrElse(CallSite.empty)
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
Expand All @@ -858,14 +867,20 @@ private[spark] object JsonProtocol {
.getOrElse(json \ "Tachyon Size").extract[Long]
val diskSize = (json \ "Disk Size").extract[Long]

val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope)
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
rddInfo.numCachedPartitions = numCachedPartitions
rddInfo.memSize = memSize
rddInfo.externalBlockStoreSize = externalBlockStoreSize
rddInfo.diskSize = diskSize
rddInfo
}

def callsiteFromJson(json: JValue): CallSite = {
val shortForm = (json \ "Short Form").extract[String]
val longForm = (json \ "Long Form").extract[String]
CallSite(shortForm, longForm)
}

def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, longForm: String)
private[spark] object CallSite {
val SHORT_FORM = "callSite.short"
val LONG_FORM = "callSite.long"
val empty = CallSite("", "")
}

/**
Expand Down
14 changes: 7 additions & 7 deletions core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
"label="Stage 0";\n subgraph "))
assert(stage0.contains("{\n label="parallelize";\n " +
"0 [label="ParallelCollectionRDD [0]"];\n }"))
"0 [label="ParallelCollectionRDD [0]"))
assert(stage0.contains("{\n label="map";\n " +
"1 [label="MapPartitionsRDD [1]"];\n }"))
"1 [label="MapPartitionsRDD [1]"))
assert(stage0.contains("{\n label="groupBy";\n " +
"2 [label="MapPartitionsRDD [2]"];\n }"))
"2 [label="MapPartitionsRDD [2]"))

val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
"/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
"label="Stage 1";\n subgraph "))
assert(stage1.contains("{\n label="groupBy";\n " +
"3 [label="ShuffledRDD [3]"];\n }"))
"3 [label="ShuffledRDD [3]"))
assert(stage1.contains("{\n label="map";\n " +
"4 [label="MapPartitionsRDD [4]"];\n }"))
"4 [label="MapPartitionsRDD [4]"))
assert(stage1.contains("{\n label="groupBy";\n " +
"5 [label="MapPartitionsRDD [5]"];\n }"))
"5 [label="MapPartitionsRDD [5]"))

val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
"/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
"label="Stage 2";\n subgraph "))
assert(stage2.contains("{\n label="groupBy";\n " +
"6 [label="ShuffledRDD [6]"];\n }"))
"6 [label="ShuffledRDD [6]"))
}
}

Expand Down
37 changes: 31 additions & 6 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite {
test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testCallsite(CallSite("happy", "birthday"))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
Expand Down Expand Up @@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite {
testBlockId(StreamBlockId(1, 2L))
}

/* ============================== *
| Backward compatibility tests |
* ============================== */

test("ExceptionFailure backward compatibility") {
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
None, None)
Expand Down Expand Up @@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}

test("RDDInfo backward compatibility (scope, parent IDs)") {
// Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
val rddInfo = new RDDInfo(
1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable")))
test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
// "Scope" and "Parent IDs" were introduced in Spark 1.4.0
// "Callsite" was introduced in Spark 1.6.0
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
CallSite("short", "long"), Some(new RDDOperationScope("fable")))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Parent IDs"})
.removeField({ _._1 == "Scope"})
val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None)
.removeField({ _._1 == "Callsite"})
val expectedRddInfo = new RDDInfo(
1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None)
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
}

Expand Down Expand Up @@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite {
assertEquals(info, newInfo)
}

private def testCallsite(callsite: CallSite): Unit = {
val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite))
assert(callsite === newCallsite)
}

private def testStageInfo(info: StageInfo) {
val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
assertEquals(info, newInfo)
Expand Down Expand Up @@ -712,7 +725,8 @@ class JsonProtocolSuite extends SparkFunSuite {
}

private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7))
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK,
Seq(1, 4, 7), CallSite(a.toString, b.toString))
r.numCachedPartitions = c
r.memSize = d
r.diskSize = e
Expand Down Expand Up @@ -855,6 +869,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 101,
| "Name": "mayor",
| "Callsite": {"Short Form": "101", "Long Form": "201"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand Down Expand Up @@ -1257,6 +1272,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 1,
| "Name": "mayor",
| "Callsite": {"Short Form": "1", "Long Form": "200"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand Down Expand Up @@ -1300,6 +1316,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 2,
| "Name": "mayor",
| "Callsite": {"Short Form": "2", "Long Form": "400"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1317,6 +1334,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
| "Callsite": {"Short Form": "3", "Long Form": "401"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand Down Expand Up @@ -1360,6 +1378,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 3,
| "Name": "mayor",
| "Callsite": {"Short Form": "3", "Long Form": "600"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1377,6 +1396,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
| "Callsite": {"Short Form": "4", "Long Form": "601"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1394,6 +1414,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
| "Callsite": {"Short Form": "5", "Long Form": "602"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand Down Expand Up @@ -1437,6 +1458,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 4,
| "Name": "mayor",
| "Callsite": {"Short Form": "4", "Long Form": "800"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1454,6 +1476,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 5,
| "Name": "mayor",
| "Callsite": {"Short Form": "5", "Long Form": "801"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1471,6 +1494,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 6,
| "Name": "mayor",
| "Callsite": {"Short Form": "6", "Long Form": "802"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand All @@ -1488,6 +1512,7 @@ class JsonProtocolSuite extends SparkFunSuite {
| {
| "RDD ID": 7,
| "Name": "mayor",
| "Callsite": {"Short Form": "7", "Long Form": "803"},
| "Parent IDs": [1, 4, 7],
| "Storage Level": {
| "Use Disk": true,
Expand Down