Skip to content

Commit 23a12ce

Browse files
darabospwendell
authored andcommitted
SPARK-2035: Store call stack for stages, display it on the UI.
I'm not sure about the test -- I get a lot of unrelated failures for some reason. I'll try to sort it out. But hopefully the automation will test this for me if I send a pull request :). I'll attach a demo HTML in [Jira](https://issues.apache.org/jira/browse/SPARK-2035). Author: Daniel Darabos <[email protected]> Author: Patrick Wendell <[email protected]> Closes apache#981 from darabos/darabos-call-stack and squashes the following commits: f7c6bfa [Daniel Darabos] Fix bad merge. I undid 83c226d by Doris. 3d0a48d [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack b857849 [Daniel Darabos] Style: Break long line. ecb5690 [Daniel Darabos] Include the last Spark method in the full stack trace. Otherwise it is not visible if the stage name is overridden. d00a85b [Patrick Wendell] Make call sites for stages non-optional and well defined b9eba24 [Daniel Darabos] Make StageInfo.details non-optional. Add JSON serialization code for the new field. Verify JSON backward compatibility. 4312828 [Daniel Darabos] Remove Mima excludes for CallSite. They should be unnecessary now, with SPARK-2070 fixed. 0920750 [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack a4b1faf [Daniel Darabos] Add Mima exclusions for the CallSite changes it has picked up. They are private methods/classes, so we ought to be safe. 932f810 [Daniel Darabos] Use empty CallSite instead of null in DAGSchedulerSuite. Outside of testing, this parameter always originates in SparkContext.scala, and will never be null. ccd89d1 [Daniel Darabos] Fix long lines. ac173e4 [Daniel Darabos] Hide "show details" if there are no details to show. 6182da6 [Daniel Darabos] Set a configurable limit on maximum call stack depth. It can be useful in memory-constrained situations with large numbers of stages. 8fe2e34 [Daniel Darabos] Store call stack for stages, display it on the UI.
1 parent 8cd04c3 commit 23a12ce

File tree

15 files changed

+115
-53
lines changed

15 files changed

+115
-53
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ span.kill-link {
8787
span.kill-link a {
8888
color: gray;
8989
}
90+
91+
span.expand-details {
92+
font-size: 10pt;
93+
cursor: pointer;
94+
color: grey;
95+
float: right;
96+
}
97+
98+
.stage-details {
99+
max-height: 100px;
100+
overflow-y: auto;
101+
margin: 0;
102+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
103+
}
104+
105+
.stage-details.collapsed {
106+
max-height: 0;
107+
padding-top: 0;
108+
padding-bottom: 0;
109+
border: none;
110+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
4949
import org.apache.spark.scheduler.local.LocalBackend
5050
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5151
import org.apache.spark.ui.SparkUI
52-
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
52+
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5353

5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -1036,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
10361036
* Capture the current user callsite and return a formatted version for printing. If the user
10371037
* has overridden the call site, this will return the user's version.
10381038
*/
1039-
private[spark] def getCallSite(): String = {
1040-
val defaultCallSite = Utils.getCallSiteInfo
1041-
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
1039+
private[spark] def getCallSite(): CallSite = {
1040+
Option(getLocalProperty("externalCallSite")) match {
1041+
case Some(callSite) => CallSite(callSite, long = "")
1042+
case None => Utils.getCallSite
1043+
}
10421044
}
10431045

10441046
/**
@@ -1058,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
10581060
}
10591061
val callSite = getCallSite
10601062
val cleanedFunc = clean(func)
1061-
logInfo("Starting job: " + callSite)
1063+
logInfo("Starting job: " + callSite.short)
10621064
val start = System.nanoTime
10631065
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10641066
resultHandler, localProperties.get)
1065-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1067+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
10661068
rdd.doCheckpoint()
10671069
}
10681070

@@ -1143,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
11431145
evaluator: ApproximateEvaluator[U, R],
11441146
timeout: Long): PartialResult[R] = {
11451147
val callSite = getCallSite
1146-
logInfo("Starting job: " + callSite)
1148+
logInfo("Starting job: " + callSite.short)
11471149
val start = System.nanoTime
11481150
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11491151
localProperties.get)
1150-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1152+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
11511153
result
11521154
}
11531155

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
4040
import org.apache.spark.partial.GroupedCountEvaluator
4141
import org.apache.spark.partial.PartialResult
4242
import org.apache.spark.storage.StorageLevel
43-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
43+
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
4444
import org.apache.spark.util.collection.OpenHashMap
4545
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
4646

@@ -1189,8 +1189,8 @@ abstract class RDD[T: ClassTag](
11891189
private var storageLevel: StorageLevel = StorageLevel.NONE
11901190

11911191
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1192-
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
1193-
private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString
1192+
@transient private[spark] val creationSite = Utils.getCallSite
1193+
private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
11941194

11951195
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
11961196

core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import java.util.Properties
2121

2222
import org.apache.spark.TaskContext
23+
import org.apache.spark.util.CallSite
2324

2425
/**
2526
* Tracks information about an active job in the DAGScheduler.
@@ -29,7 +30,7 @@ private[spark] class ActiveJob(
2930
val finalStage: Stage,
3031
val func: (TaskContext, Iterator[_]) => _,
3132
val partitions: Array[Int],
32-
val callSite: String,
33+
val callSite: CallSite,
3334
val listener: JobListener,
3435
val properties: Properties) {
3536

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.{SystemClock, Clock, Utils}
41+
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -195,7 +195,9 @@ class DAGScheduler(
195195
case Some(stage) => stage
196196
case None =>
197197
val stage =
198-
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
198+
newOrUsedStage(
199+
shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
200+
shuffleDep.rdd.creationSite)
199201
shuffleToMapStage(shuffleDep.shuffleId) = stage
200202
stage
201203
}
@@ -212,7 +214,7 @@ class DAGScheduler(
212214
numTasks: Int,
213215
shuffleDep: Option[ShuffleDependency[_, _, _]],
214216
jobId: Int,
215-
callSite: Option[String] = None)
217+
callSite: CallSite)
216218
: Stage =
217219
{
218220
val id = nextStageId.getAndIncrement()
@@ -235,7 +237,7 @@ class DAGScheduler(
235237
numTasks: Int,
236238
shuffleDep: ShuffleDependency[_, _, _],
237239
jobId: Int,
238-
callSite: Option[String] = None)
240+
callSite: CallSite)
239241
: Stage =
240242
{
241243
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
@@ -413,7 +415,7 @@ class DAGScheduler(
413415
rdd: RDD[T],
414416
func: (TaskContext, Iterator[T]) => U,
415417
partitions: Seq[Int],
416-
callSite: String,
418+
callSite: CallSite,
417419
allowLocal: Boolean,
418420
resultHandler: (Int, U) => Unit,
419421
properties: Properties = null): JobWaiter[U] =
@@ -443,7 +445,7 @@ class DAGScheduler(
443445
rdd: RDD[T],
444446
func: (TaskContext, Iterator[T]) => U,
445447
partitions: Seq[Int],
446-
callSite: String,
448+
callSite: CallSite,
447449
allowLocal: Boolean,
448450
resultHandler: (Int, U) => Unit,
449451
properties: Properties = null)
@@ -452,7 +454,7 @@ class DAGScheduler(
452454
waiter.awaitResult() match {
453455
case JobSucceeded => {}
454456
case JobFailed(exception: Exception) =>
455-
logInfo("Failed to run " + callSite)
457+
logInfo("Failed to run " + callSite.short)
456458
throw exception
457459
}
458460
}
@@ -461,7 +463,7 @@ class DAGScheduler(
461463
rdd: RDD[T],
462464
func: (TaskContext, Iterator[T]) => U,
463465
evaluator: ApproximateEvaluator[U, R],
464-
callSite: String,
466+
callSite: CallSite,
465467
timeout: Long,
466468
properties: Properties = null)
467469
: PartialResult[R] =
@@ -666,15 +668,15 @@ class DAGScheduler(
666668
func: (TaskContext, Iterator[_]) => _,
667669
partitions: Array[Int],
668670
allowLocal: Boolean,
669-
callSite: String,
671+
callSite: CallSite,
670672
listener: JobListener,
671673
properties: Properties = null)
672674
{
673675
var finalStage: Stage = null
674676
try {
675677
// New stage creation may throw an exception if, for example, jobs are run on a
676678
// HadoopRDD whose underlying HDFS files have been deleted.
677-
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
679+
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
678680
} catch {
679681
case e: Exception =>
680682
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
@@ -685,7 +687,7 @@ class DAGScheduler(
685687
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
686688
clearCacheLocs()
687689
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
688-
job.jobId, callSite, partitions.length, allowLocal))
690+
job.jobId, callSite.short, partitions.length, allowLocal))
689691
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
690692
logInfo("Parents of final stage: " + finalStage.parents)
691693
logInfo("Missing parents: " + getMissingParentStages(finalStage))

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.language.existentials
2525
import org.apache.spark._
2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.rdd.RDD
28+
import org.apache.spark.util.CallSite
2829

2930
/**
3031
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted(
4041
func: (TaskContext, Iterator[_]) => _,
4142
partitions: Array[Int],
4243
allowLocal: Boolean,
43-
callSite: String,
44+
callSite: CallSite,
4445
listener: JobListener,
4546
properties: Properties = null)
4647
extends DAGSchedulerEvent

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import org.apache.spark._
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.storage.BlockManagerId
23+
import org.apache.spark.util.CallSite
2324

2425
/**
2526
* A stage is a set of independent tasks all computing the same function that need to run as part
@@ -35,6 +36,11 @@ import org.apache.spark.storage.BlockManagerId
3536
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
3637
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
3738
* faster on failure.
39+
*
40+
* The callSite provides a location in user code which relates to the stage. For a shuffle map
41+
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
42+
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
43+
*
3844
*/
3945
private[spark] class Stage(
4046
val id: Int,
@@ -43,7 +49,7 @@ private[spark] class Stage(
4349
val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage
4450
val parents: List[Stage],
4551
val jobId: Int,
46-
callSite: Option[String])
52+
val callSite: CallSite)
4753
extends Logging {
4854

4955
val isShuffleMap = shuffleDep.isDefined
@@ -100,7 +106,8 @@ private[spark] class Stage(
100106
id
101107
}
102108

103-
val name = callSite.getOrElse(rdd.getCreationSite)
109+
val name = callSite.short
110+
val details = callSite.long
104111

105112
override def toString = "Stage " + id
106113

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,12 @@ import org.apache.spark.storage.RDDInfo
2525
* Stores information about a stage to pass from the scheduler to SparkListeners.
2626
*/
2727
@DeveloperApi
28-
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
28+
class StageInfo(
29+
val stageId: Int,
30+
val name: String,
31+
val numTasks: Int,
32+
val rddInfos: Seq[RDDInfo],
33+
val details: String) {
2934
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
3035
var submissionTime: Option[Long] = None
3136
/** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -52,6 +57,6 @@ private[spark] object StageInfo {
5257
def fromStage(stage: Stage): StageInfo = {
5358
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
5459
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
55-
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
60+
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
5661
}
5762
}

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,17 @@ private[ui] class StageTableBase(
9191
{s.name}
9292
</a>
9393

94+
val details = if (s.details.nonEmpty) (
95+
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
96+
class="expand-details">
97+
+show details
98+
</span>
99+
<pre class="stage-details collapsed">{s.details}</pre>
100+
)
101+
94102
listener.stageIdToDescription.get(s.stageId)
95103
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
96-
.getOrElse(<div> {killLink}{nameLink}</div>)
104+
.getOrElse(<div>{killLink} {nameLink} {details}</div>)
97105
}
98106

99107
protected def stageRow(s: StageInfo): Seq[Node] = {

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ private[spark] object JsonProtocol {
184184
("Stage Name" -> stageInfo.name) ~
185185
("Number of Tasks" -> stageInfo.numTasks) ~
186186
("RDD Info" -> rddInfo) ~
187+
("Details" -> stageInfo.details) ~
187188
("Submission Time" -> submissionTime) ~
188189
("Completion Time" -> completionTime) ~
189190
("Failure Reason" -> failureReason) ~
@@ -469,12 +470,13 @@ private[spark] object JsonProtocol {
469470
val stageName = (json \ "Stage Name").extract[String]
470471
val numTasks = (json \ "Number of Tasks").extract[Int]
471472
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
473+
val details = (json \ "Details").extractOpt[String].getOrElse("")
472474
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
473475
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
474476
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
475477
val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
476478

477-
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
479+
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
478480
stageInfo.submissionTime = submissionTime
479481
stageInfo.completionTime = completionTime
480482
stageInfo.failureReason = failureReason

0 commit comments

Comments
 (0)