Skip to content

Commit 42d933f

Browse files
Andrew Orrxin
authored andcommitted
[SPARK-11112] DAG visualization: display RDD callsite
<img width="548" alt="screen shot 2015-11-01 at 9 42 33 am" src="https://cloud.githubusercontent.com/assets/2133137/10870343/2a8cd070-807d-11e5-857a-4ebcace77b5b.png"> mateiz sarutak Author: Andrew Or <[email protected]> Closes #9398 from andrewor14/rdd-callsite. (cherry picked from commit 7f74190) Signed-off-by: Reynold Xin <[email protected]>
1 parent c859be2 commit 42d933f

File tree

7 files changed

+79
-20
lines changed

7 files changed

+79
-20
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,7 @@
122122
stroke: #52C366;
123123
stroke-width: 2px;
124124
}
125+
126+
.tooltip-inner {
127+
white-space: pre-wrap;
128+
}

core/src/main/scala/org/apache/spark/storage/RDDInfo.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.storage
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.{RDDOperationScope, RDD}
22-
import org.apache.spark.util.Utils
22+
import org.apache.spark.util.{CallSite, Utils}
2323

2424
@DeveloperApi
2525
class RDDInfo(
@@ -28,9 +28,20 @@ class RDDInfo(
2828
val numPartitions: Int,
2929
var storageLevel: StorageLevel,
3030
val parentIds: Seq[Int],
31+
val callSite: CallSite,
3132
val scope: Option[RDDOperationScope] = None)
3233
extends Ordered[RDDInfo] {
3334

35+
def this(
36+
id: Int,
37+
name: String,
38+
numPartitions: Int,
39+
storageLevel: StorageLevel,
40+
parentIds: Seq[Int],
41+
scope: Option[RDDOperationScope] = None) {
42+
this(id, name, numPartitions, storageLevel, parentIds, CallSite.empty, scope)
43+
}
44+
3445
var numCachedPartitions = 0
3546
var memSize = 0L
3647
var diskSize = 0L
@@ -56,6 +67,7 @@ private[spark] object RDDInfo {
5667
def fromRdd(rdd: RDD[_]): RDDInfo = {
5768
val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
5869
val parentIds = rdd.dependencies.map(_.rdd.id)
59-
new RDDInfo(rdd.id, rddName, rdd.partitions.length, rdd.getStorageLevel, parentIds, rdd.scope)
70+
new RDDInfo(rdd.id, rddName, rdd.partitions.length,
71+
rdd.getStorageLevel, parentIds, rdd.creationSite, rdd.scope)
6072
}
6173
}

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.collection.mutable.{StringBuilder, ListBuffer}
2323
import org.apache.spark.Logging
2424
import org.apache.spark.scheduler.StageInfo
2525
import org.apache.spark.storage.StorageLevel
26+
import org.apache.spark.util.CallSite
2627

2728
/**
2829
* A representation of a generic cluster graph used for storing information on RDD operations.
@@ -38,7 +39,7 @@ private[ui] case class RDDOperationGraph(
3839
rootCluster: RDDOperationCluster)
3940

4041
/** A node in an RDDOperationGraph. This represents an RDD. */
41-
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean)
42+
private[ui] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: CallSite)
4243

4344
/**
4445
* A directed edge connecting two nodes in an RDDOperationGraph.
@@ -104,8 +105,8 @@ private[ui] object RDDOperationGraph extends Logging {
104105
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
105106

106107
// TODO: differentiate between the intention to cache an RDD and whether it's actually cached
107-
val node = nodes.getOrElseUpdate(
108-
rdd.id, RDDOperationNode(rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE))
108+
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(
109+
rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite))
109110

110111
if (rdd.scope.isEmpty) {
111112
// This RDD has no encompassing scope, so we put it directly in the root cluster
@@ -177,7 +178,8 @@ private[ui] object RDDOperationGraph extends Logging {
177178

178179
/** Return the dot representation of a node in an RDDOperationGraph. */
179180
private def makeDotNode(node: RDDOperationNode): String = {
180-
s"""${node.id} [label="${node.name} [${node.id}]"]"""
181+
val label = s"${node.name} [${node.id}]\n${node.callsite.shortForm}"
182+
s"""${node.id} [label="$label"]"""
181183
}
182184

183185
/** Update the dot representation of the RDDOperationGraph in cluster to subgraph. */

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ private[spark] object JsonProtocol {
398398
("RDD ID" -> rddInfo.id) ~
399399
("Name" -> rddInfo.name) ~
400400
("Scope" -> rddInfo.scope.map(_.toJson)) ~
401+
("Callsite" -> callsiteToJson(rddInfo.callSite)) ~
401402
("Parent IDs" -> parentIds) ~
402403
("Storage Level" -> storageLevel) ~
403404
("Number of Partitions" -> rddInfo.numPartitions) ~
@@ -407,6 +408,11 @@ private[spark] object JsonProtocol {
407408
("Disk Size" -> rddInfo.diskSize)
408409
}
409410

411+
def callsiteToJson(callsite: CallSite): JValue = {
412+
("Short Form" -> callsite.shortForm) ~
413+
("Long Form" -> callsite.longForm)
414+
}
415+
410416
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
411417
("Use Disk" -> storageLevel.useDisk) ~
412418
("Use Memory" -> storageLevel.useMemory) ~
@@ -851,6 +857,9 @@ private[spark] object JsonProtocol {
851857
val scope = Utils.jsonOption(json \ "Scope")
852858
.map(_.extract[String])
853859
.map(RDDOperationScope.fromJson)
860+
val callsite = Utils.jsonOption(json \ "Callsite")
861+
.map(callsiteFromJson)
862+
.getOrElse(CallSite.empty)
854863
val parentIds = Utils.jsonOption(json \ "Parent IDs")
855864
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
856865
.getOrElse(Seq.empty)
@@ -863,14 +872,20 @@ private[spark] object JsonProtocol {
863872
.getOrElse(json \ "Tachyon Size").extract[Long]
864873
val diskSize = (json \ "Disk Size").extract[Long]
865874

866-
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, scope)
875+
val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope)
867876
rddInfo.numCachedPartitions = numCachedPartitions
868877
rddInfo.memSize = memSize
869878
rddInfo.externalBlockStoreSize = externalBlockStoreSize
870879
rddInfo.diskSize = diskSize
871880
rddInfo
872881
}
873882

883+
def callsiteFromJson(json: JValue): CallSite = {
884+
val shortForm = (json \ "Short Form").extract[String]
885+
val longForm = (json \ "Long Form").extract[String]
886+
CallSite(shortForm, longForm)
887+
}
888+
874889
def storageLevelFromJson(json: JValue): StorageLevel = {
875890
val useDisk = (json \ "Use Disk").extract[Boolean]
876891
val useMemory = (json \ "Use Memory").extract[Boolean]

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private[spark] case class CallSite(shortForm: String, longForm: String)
5757
private[spark] object CallSite {
5858
val SHORT_FORM = "callSite.short"
5959
val LONG_FORM = "callSite.long"
60+
val empty = CallSite("", "")
6061
}
6162

6263
/**

core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -615,29 +615,29 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
615615
assert(stage0.contains("digraph G {\n subgraph clusterstage_0 {\n " +
616616
"label=&quot;Stage 0&quot;;\n subgraph "))
617617
assert(stage0.contains("{\n label=&quot;parallelize&quot;;\n " +
618-
"0 [label=&quot;ParallelCollectionRDD [0]&quot;];\n }"))
618+
"0 [label=&quot;ParallelCollectionRDD [0]"))
619619
assert(stage0.contains("{\n label=&quot;map&quot;;\n " +
620-
"1 [label=&quot;MapPartitionsRDD [1]&quot;];\n }"))
620+
"1 [label=&quot;MapPartitionsRDD [1]"))
621621
assert(stage0.contains("{\n label=&quot;groupBy&quot;;\n " +
622-
"2 [label=&quot;MapPartitionsRDD [2]&quot;];\n }"))
622+
"2 [label=&quot;MapPartitionsRDD [2]"))
623623

624624
val stage1 = Source.fromURL(sc.ui.get.appUIAddress +
625625
"/stages/stage/?id=1&attempt=0&expandDagViz=true").mkString
626626
assert(stage1.contains("digraph G {\n subgraph clusterstage_1 {\n " +
627627
"label=&quot;Stage 1&quot;;\n subgraph "))
628628
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
629-
"3 [label=&quot;ShuffledRDD [3]&quot;];\n }"))
629+
"3 [label=&quot;ShuffledRDD [3]"))
630630
assert(stage1.contains("{\n label=&quot;map&quot;;\n " +
631-
"4 [label=&quot;MapPartitionsRDD [4]&quot;];\n }"))
631+
"4 [label=&quot;MapPartitionsRDD [4]"))
632632
assert(stage1.contains("{\n label=&quot;groupBy&quot;;\n " +
633-
"5 [label=&quot;MapPartitionsRDD [5]&quot;];\n }"))
633+
"5 [label=&quot;MapPartitionsRDD [5]"))
634634

635635
val stage2 = Source.fromURL(sc.ui.get.appUIAddress +
636636
"/stages/stage/?id=2&attempt=0&expandDagViz=true").mkString
637637
assert(stage2.contains("digraph G {\n subgraph clusterstage_2 {\n " +
638638
"label=&quot;Stage 2&quot;;\n subgraph "))
639639
assert(stage2.contains("{\n label=&quot;groupBy&quot;;\n " +
640-
"6 [label=&quot;ShuffledRDD [6]&quot;];\n }"))
640+
"6 [label=&quot;ShuffledRDD [6]"))
641641
}
642642
}
643643

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class JsonProtocolSuite extends SparkFunSuite {
111111
test("Dependent Classes") {
112112
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
113113
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
114+
testCallsite(CallSite("happy", "birthday"))
114115
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
115116
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
116117
testTaskMetrics(makeTaskMetrics(
@@ -163,6 +164,10 @@ class JsonProtocolSuite extends SparkFunSuite {
163164
testBlockId(StreamBlockId(1, 2L))
164165
}
165166

167+
/* ============================== *
168+
| Backward compatibility tests |
169+
* ============================== */
170+
166171
test("ExceptionFailure backward compatibility") {
167172
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null,
168173
None, None)
@@ -334,14 +339,17 @@ class JsonProtocolSuite extends SparkFunSuite {
334339
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
335340
}
336341

337-
test("RDDInfo backward compatibility (scope, parent IDs)") {
338-
// Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
339-
val rddInfo = new RDDInfo(
340-
1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), Some(new RDDOperationScope("fable")))
342+
test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
343+
// "Scope" and "Parent IDs" were introduced in Spark 1.4.0
344+
// "Callsite" was introduced in Spark 1.6.0
345+
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8),
346+
CallSite("short", "long"), Some(new RDDOperationScope("fable")))
341347
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
342348
.removeField({ _._1 == "Parent IDs"})
343349
.removeField({ _._1 == "Scope"})
344-
val expectedRddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq.empty, scope = None)
350+
.removeField({ _._1 == "Callsite"})
351+
val expectedRddInfo = new RDDInfo(
352+
1, "one", 100, StorageLevel.NONE, Seq.empty, CallSite.empty, scope = None)
345353
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
346354
}
347355

@@ -389,6 +397,11 @@ class JsonProtocolSuite extends SparkFunSuite {
389397
assertEquals(info, newInfo)
390398
}
391399

400+
private def testCallsite(callsite: CallSite): Unit = {
401+
val newCallsite = JsonProtocol.callsiteFromJson(JsonProtocol.callsiteToJson(callsite))
402+
assert(callsite === newCallsite)
403+
}
404+
392405
private def testStageInfo(info: StageInfo) {
393406
val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
394407
assertEquals(info, newInfo)
@@ -713,7 +726,8 @@ class JsonProtocolSuite extends SparkFunSuite {
713726
}
714727

715728
private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
716-
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7))
729+
val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK,
730+
Seq(1, 4, 7), CallSite(a.toString, b.toString))
717731
r.numCachedPartitions = c
718732
r.memSize = d
719733
r.diskSize = e
@@ -856,6 +870,7 @@ class JsonProtocolSuite extends SparkFunSuite {
856870
| {
857871
| "RDD ID": 101,
858872
| "Name": "mayor",
873+
| "Callsite": {"Short Form": "101", "Long Form": "201"},
859874
| "Parent IDs": [1, 4, 7],
860875
| "Storage Level": {
861876
| "Use Disk": true,
@@ -1258,6 +1273,7 @@ class JsonProtocolSuite extends SparkFunSuite {
12581273
| {
12591274
| "RDD ID": 1,
12601275
| "Name": "mayor",
1276+
| "Callsite": {"Short Form": "1", "Long Form": "200"},
12611277
| "Parent IDs": [1, 4, 7],
12621278
| "Storage Level": {
12631279
| "Use Disk": true,
@@ -1301,6 +1317,7 @@ class JsonProtocolSuite extends SparkFunSuite {
13011317
| {
13021318
| "RDD ID": 2,
13031319
| "Name": "mayor",
1320+
| "Callsite": {"Short Form": "2", "Long Form": "400"},
13041321
| "Parent IDs": [1, 4, 7],
13051322
| "Storage Level": {
13061323
| "Use Disk": true,
@@ -1318,6 +1335,7 @@ class JsonProtocolSuite extends SparkFunSuite {
13181335
| {
13191336
| "RDD ID": 3,
13201337
| "Name": "mayor",
1338+
| "Callsite": {"Short Form": "3", "Long Form": "401"},
13211339
| "Parent IDs": [1, 4, 7],
13221340
| "Storage Level": {
13231341
| "Use Disk": true,
@@ -1361,6 +1379,7 @@ class JsonProtocolSuite extends SparkFunSuite {
13611379
| {
13621380
| "RDD ID": 3,
13631381
| "Name": "mayor",
1382+
| "Callsite": {"Short Form": "3", "Long Form": "600"},
13641383
| "Parent IDs": [1, 4, 7],
13651384
| "Storage Level": {
13661385
| "Use Disk": true,
@@ -1378,6 +1397,7 @@ class JsonProtocolSuite extends SparkFunSuite {
13781397
| {
13791398
| "RDD ID": 4,
13801399
| "Name": "mayor",
1400+
| "Callsite": {"Short Form": "4", "Long Form": "601"},
13811401
| "Parent IDs": [1, 4, 7],
13821402
| "Storage Level": {
13831403
| "Use Disk": true,
@@ -1395,6 +1415,7 @@ class JsonProtocolSuite extends SparkFunSuite {
13951415
| {
13961416
| "RDD ID": 5,
13971417
| "Name": "mayor",
1418+
| "Callsite": {"Short Form": "5", "Long Form": "602"},
13981419
| "Parent IDs": [1, 4, 7],
13991420
| "Storage Level": {
14001421
| "Use Disk": true,
@@ -1438,6 +1459,7 @@ class JsonProtocolSuite extends SparkFunSuite {
14381459
| {
14391460
| "RDD ID": 4,
14401461
| "Name": "mayor",
1462+
| "Callsite": {"Short Form": "4", "Long Form": "800"},
14411463
| "Parent IDs": [1, 4, 7],
14421464
| "Storage Level": {
14431465
| "Use Disk": true,
@@ -1455,6 +1477,7 @@ class JsonProtocolSuite extends SparkFunSuite {
14551477
| {
14561478
| "RDD ID": 5,
14571479
| "Name": "mayor",
1480+
| "Callsite": {"Short Form": "5", "Long Form": "801"},
14581481
| "Parent IDs": [1, 4, 7],
14591482
| "Storage Level": {
14601483
| "Use Disk": true,
@@ -1472,6 +1495,7 @@ class JsonProtocolSuite extends SparkFunSuite {
14721495
| {
14731496
| "RDD ID": 6,
14741497
| "Name": "mayor",
1498+
| "Callsite": {"Short Form": "6", "Long Form": "802"},
14751499
| "Parent IDs": [1, 4, 7],
14761500
| "Storage Level": {
14771501
| "Use Disk": true,
@@ -1489,6 +1513,7 @@ class JsonProtocolSuite extends SparkFunSuite {
14891513
| {
14901514
| "RDD ID": 7,
14911515
| "Name": "mayor",
1516+
| "Callsite": {"Short Form": "7", "Long Form": "803"},
14921517
| "Parent IDs": [1, 4, 7],
14931518
| "Storage Level": {
14941519
| "Use Disk": true,

0 commit comments

Comments
 (0)