Skip to content

Commit d00a85b

Browse files
pwendelldarabos
authored andcommitted
Make call sites for stages non-optional and well defined
1 parent b9eba24 commit d00a85b

File tree

2 files changed

+12
-7
lines changed

2 files changed

+12
-7
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class DAGScheduler(
195195
case Some(stage) => stage
196196
case None =>
197197
val stage =
198-
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
198+
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId, shuffleDep.rdd.creationSite)
199199
shuffleToMapStage(shuffleDep.shuffleId) = stage
200200
stage
201201
}
@@ -212,7 +212,7 @@ class DAGScheduler(
212212
numTasks: Int,
213213
shuffleDep: Option[ShuffleDependency[_,_]],
214214
jobId: Int,
215-
callSite: Option[CallSite] = None)
215+
callSite: CallSite)
216216
: Stage =
217217
{
218218
val id = nextStageId.getAndIncrement()
@@ -235,7 +235,7 @@ class DAGScheduler(
235235
numTasks: Int,
236236
shuffleDep: ShuffleDependency[_,_],
237237
jobId: Int,
238-
callSite: Option[CallSite] = None)
238+
callSite: CallSite)
239239
: Stage =
240240
{
241241
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
@@ -674,7 +674,7 @@ class DAGScheduler(
674674
try {
675675
// New stage creation may throw an exception if, for example, jobs are run on a
676676
// HadoopRDD whose underlying HDFS files have been deleted.
677-
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
677+
finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
678678
} catch {
679679
case e: Exception =>
680680
logWarning("Creating new stage failed due to exception - job: " + jobId, e)

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ import org.apache.spark.util.CallSite
3636
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
3737
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
3838
* faster on failure.
39+
*
40+
* The callSite provides a location in user code which relates to the stage. For a shuffle map
41+
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
42+
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
43+
*
3944
*/
4045
private[spark] class Stage(
4146
val id: Int,
@@ -44,7 +49,7 @@ private[spark] class Stage(
4449
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
4550
val parents: List[Stage],
4651
val jobId: Int,
47-
callSite: Option[CallSite])
52+
val callSite: CallSite)
4853
extends Logging {
4954

5055
val isShuffleMap = shuffleDep.isDefined
@@ -101,8 +106,8 @@ private[spark] class Stage(
101106
id
102107
}
103108

104-
val name = callSite.map(_.short).getOrElse(rdd.getCreationSite)
105-
val details = callSite.map(_.long).getOrElse("")
109+
val name = callSite.short
110+
val details = callSite.long
106111

107112
override def toString = "Stage " + id
108113

0 commit comments

Comments
 (0)