Skip to content

Commit 8fe2e34

Browse files
committed
Store call stack for stages, display it on the UI.
1 parent b8d2580 commit 8fe2e34

File tree

11 files changed

+76
-44
lines changed

11 files changed

+76
-44
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,24 @@ span.kill-link {
8787
span.kill-link a {
8888
color: gray;
8989
}
90+
91+
span.expand-details {
92+
font-size: 10pt;
93+
cursor: pointer;
94+
color: grey;
95+
float: right;
96+
}
97+
98+
.stage-details {
99+
max-height: 100px;
100+
overflow-y: auto;
101+
margin: 0;
102+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
103+
}
104+
105+
.stage-details.collapsed {
106+
max-height: 0;
107+
padding-top: 0;
108+
padding-bottom: 0;
109+
border: none;
110+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
4949
import org.apache.spark.scheduler.local.LocalBackend
5050
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
5151
import org.apache.spark.ui.SparkUI
52-
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
52+
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
5353

5454
/**
5555
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -1020,9 +1020,11 @@ class SparkContext(config: SparkConf) extends Logging {
10201020
* Capture the current user callsite and return a formatted version for printing. If the user
10211021
* has overridden the call site, this will return the user's version.
10221022
*/
1023-
private[spark] def getCallSite(): String = {
1024-
val defaultCallSite = Utils.getCallSiteInfo
1025-
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
1023+
private[spark] def getCallSite(): CallSite = {
1024+
Option(getLocalProperty("externalCallSite")) match {
1025+
case Some(callSite) => CallSite(callSite, long = "")
1026+
case None => Utils.getCallSite
1027+
}
10261028
}
10271029

10281030
/**
@@ -1042,11 +1044,11 @@ class SparkContext(config: SparkConf) extends Logging {
10421044
}
10431045
val callSite = getCallSite
10441046
val cleanedFunc = clean(func)
1045-
logInfo("Starting job: " + callSite)
1047+
logInfo("Starting job: " + callSite.short)
10461048
val start = System.nanoTime
10471049
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
10481050
resultHandler, localProperties.get)
1049-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1051+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
10501052
rdd.doCheckpoint()
10511053
}
10521054

@@ -1127,11 +1129,11 @@ class SparkContext(config: SparkConf) extends Logging {
11271129
evaluator: ApproximateEvaluator[U, R],
11281130
timeout: Long): PartialResult[R] = {
11291131
val callSite = getCallSite
1130-
logInfo("Starting job: " + callSite)
1132+
logInfo("Starting job: " + callSite.short)
11311133
val start = System.nanoTime
11321134
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
11331135
localProperties.get)
1134-
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
1136+
logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
11351137
result
11361138
}
11371139

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
4040
import org.apache.spark.partial.GroupedCountEvaluator
4141
import org.apache.spark.partial.PartialResult
4242
import org.apache.spark.storage.StorageLevel
43-
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
43+
import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
4444
import org.apache.spark.util.collection.OpenHashMap
4545
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}
4646

@@ -1179,8 +1179,8 @@ abstract class RDD[T: ClassTag](
11791179
private var storageLevel: StorageLevel = StorageLevel.NONE
11801180

11811181
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
1182-
@transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
1183-
private[spark] def getCreationSite: String = creationSiteInfo.toString
1182+
@transient private[spark] val creationSite = Utils.getCallSite
1183+
private[spark] def getCreationSite: String = creationSite.short
11841184

11851185
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
11861186

core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import java.util.Properties
2121

2222
import org.apache.spark.TaskContext
23+
import org.apache.spark.util.CallSite
2324

2425
/**
2526
* Tracks information about an active job in the DAGScheduler.
@@ -29,7 +30,7 @@ private[spark] class ActiveJob(
2930
val finalStage: Stage,
3031
val func: (TaskContext, Iterator[_]) => _,
3132
val partitions: Array[Int],
32-
val callSite: String,
33+
val callSite: CallSite,
3334
val listener: JobListener,
3435
val properties: Properties) {
3536

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
3838
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{CallSite, Utils}
4242

4343
/**
4444
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -211,7 +211,7 @@ class DAGScheduler(
211211
numTasks: Int,
212212
shuffleDep: Option[ShuffleDependency[_,_]],
213213
jobId: Int,
214-
callSite: Option[String] = None)
214+
callSite: Option[CallSite] = None)
215215
: Stage =
216216
{
217217
val id = nextStageId.getAndIncrement()
@@ -234,7 +234,7 @@ class DAGScheduler(
234234
numTasks: Int,
235235
shuffleDep: ShuffleDependency[_,_],
236236
jobId: Int,
237-
callSite: Option[String] = None)
237+
callSite: Option[CallSite] = None)
238238
: Stage =
239239
{
240240
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
@@ -412,7 +412,7 @@ class DAGScheduler(
412412
rdd: RDD[T],
413413
func: (TaskContext, Iterator[T]) => U,
414414
partitions: Seq[Int],
415-
callSite: String,
415+
callSite: CallSite,
416416
allowLocal: Boolean,
417417
resultHandler: (Int, U) => Unit,
418418
properties: Properties = null): JobWaiter[U] =
@@ -442,7 +442,7 @@ class DAGScheduler(
442442
rdd: RDD[T],
443443
func: (TaskContext, Iterator[T]) => U,
444444
partitions: Seq[Int],
445-
callSite: String,
445+
callSite: CallSite,
446446
allowLocal: Boolean,
447447
resultHandler: (Int, U) => Unit,
448448
properties: Properties = null)
@@ -451,7 +451,7 @@ class DAGScheduler(
451451
waiter.awaitResult() match {
452452
case JobSucceeded => {}
453453
case JobFailed(exception: Exception) =>
454-
logInfo("Failed to run " + callSite)
454+
logInfo("Failed to run " + callSite.short)
455455
throw exception
456456
}
457457
}
@@ -460,7 +460,7 @@ class DAGScheduler(
460460
rdd: RDD[T],
461461
func: (TaskContext, Iterator[T]) => U,
462462
evaluator: ApproximateEvaluator[U, R],
463-
callSite: String,
463+
callSite: CallSite,
464464
timeout: Long,
465465
properties: Properties = null)
466466
: PartialResult[R] =
@@ -665,7 +665,7 @@ class DAGScheduler(
665665
func: (TaskContext, Iterator[_]) => _,
666666
partitions: Array[Int],
667667
allowLocal: Boolean,
668-
callSite: String,
668+
callSite: CallSite,
669669
listener: JobListener,
670670
properties: Properties = null)
671671
{
@@ -684,7 +684,7 @@ class DAGScheduler(
684684
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
685685
clearCacheLocs()
686686
logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
687-
job.jobId, callSite, partitions.length, allowLocal))
687+
job.jobId, callSite.short, partitions.length, allowLocal))
688688
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
689689
logInfo("Parents of final stage: " + finalStage.parents)
690690
logInfo("Missing parents: " + getMissingParentStages(finalStage))

core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import scala.language.existentials
2525
import org.apache.spark._
2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.rdd.RDD
28+
import org.apache.spark.util.CallSite
2829

2930
/**
3031
* Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted(
4041
func: (TaskContext, Iterator[_]) => _,
4142
partitions: Array[Int],
4243
allowLocal: Boolean,
43-
callSite: String,
44+
callSite: CallSite,
4445
listener: JobListener,
4546
properties: Properties = null)
4647
extends DAGSchedulerEvent

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
2020
import org.apache.spark._
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.storage.BlockManagerId
23+
import org.apache.spark.util.CallSite
2324

2425
/**
2526
* A stage is a set of independent tasks all computing the same function that need to run as part
@@ -43,7 +44,7 @@ private[spark] class Stage(
4344
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
4445
val parents: List[Stage],
4546
val jobId: Int,
46-
callSite: Option[String])
47+
callSite: Option[CallSite])
4748
extends Logging {
4849

4950
val isShuffleMap = shuffleDep.isDefined
@@ -100,7 +101,8 @@ private[spark] class Stage(
100101
id
101102
}
102103

103-
val name = callSite.getOrElse(rdd.getCreationSite)
104+
val name = callSite.map(_.short).getOrElse(rdd.getCreationSite)
105+
val details = callSite.map(_.long).getOrElse("")
104106

105107
override def toString = "Stage " + id
106108

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.storage.RDDInfo
2525
* Stores information about a stage to pass from the scheduler to SparkListeners.
2626
*/
2727
@DeveloperApi
28-
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
28+
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], val details: String = "") {
2929
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
3030
var submissionTime: Option[Long] = None
3131
/** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -52,6 +52,6 @@ private[spark] object StageInfo {
5252
def fromStage(stage: Stage): StageInfo = {
5353
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
5454
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
55-
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
55+
new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
5656
}
5757
}

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,16 @@ private[ui] class StageTableBase(
9191
{s.name}
9292
</a>
9393

94+
val details = (
95+
<span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" class="expand-details">
96+
+show details
97+
</span>
98+
<pre class="stage-details collapsed">{s.details}</pre>
99+
)
100+
94101
listener.stageIdToDescription.get(s.stageId)
95102
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
96-
.getOrElse(<div> {killLink}{nameLink}</div>)
103+
.getOrElse(<div>{killLink} {nameLink} {details}</div>)
97104
}
98105

99106
protected def stageRow(s: StageInfo): Seq[Node] = {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
4343
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
4444
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
4545

46+
/** CallSite represents a place in user code. It can have a short and a long form. */
47+
private[spark] case class CallSite(val short: String, val long: String)
48+
4649
/**
4750
* Various utility methods used by Spark.
4851
*/
@@ -799,21 +802,12 @@ private[spark] object Utils extends Logging {
799802
*/
800803
private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
801804

802-
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
803-
val firstUserLine: Int, val firstUserClass: String) {
804-
805-
/** Returns a printable version of the call site info suitable for logs. */
806-
override def toString = {
807-
"%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
808-
}
809-
}
810-
811805
/**
812806
* When called inside a class in the spark package, returns the name of the user code class
813807
* (outside the spark package) that called into Spark, as well as which Spark method they called.
814808
* This is used, for example, to tell users where in their code each RDD got created.
815809
*/
816-
def getCallSiteInfo: CallSiteInfo = {
810+
def getCallSite: CallSite = {
817811
val trace = Thread.currentThread.getStackTrace()
818812
.filterNot(_.getMethodName.contains("getStackTrace"))
819813

@@ -824,11 +818,11 @@ private[spark] object Utils extends Logging {
824818
var lastSparkMethod = "<unknown>"
825819
var firstUserFile = "<unknown>"
826820
var firstUserLine = 0
827-
var finished = false
828-
var firstUserClass = "<unknown>"
821+
var insideSpark = true
822+
var userCallStack = new ArrayBuffer[String]
829823

830824
for (el <- trace) {
831-
if (!finished) {
825+
if (insideSpark) {
832826
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
833827
lastSparkMethod = if (el.getMethodName == "<init>") {
834828
// Spark method is a constructor; get its class name
@@ -839,12 +833,16 @@ private[spark] object Utils extends Logging {
839833
} else {
840834
firstUserLine = el.getLineNumber
841835
firstUserFile = el.getFileName
842-
firstUserClass = el.getClassName
843-
finished = true
836+
userCallStack += el.toString
837+
insideSpark = false
844838
}
839+
} else {
840+
userCallStack += el.toString
845841
}
846842
}
847-
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
843+
CallSite(
844+
short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
845+
long = userCallStack.mkString("\n"))
848846
}
849847

850848
/** Return a string containing part of a file from byte 'start' to 'end'. */

0 commit comments

Comments
 (0)