Skip to content

Commit 3009088

Browse files
committed
[SPARK-8880] Fix confusing Stage.attemptId member variable
Author: Kay Ousterhout <[email protected]> Closes #7275 from kayousterhout/SPARK-8880 and squashes the following commits: 3e9ce7c [Kay Ousterhout] Added missing return type e150278 [Kay Ousterhout] [SPARK-8880] Fix confusing Stage.attemptId member variable
1 parent c472eb1 commit 3009088

File tree

3 files changed

+18
-12
lines changed

3 files changed

+18
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ class DAGScheduler(
872872
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
873873
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
874874
// event.
875-
stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
875+
stage.makeNewStageAttempt(partitionsToCompute.size)
876876
outputCommitCoordinator.stageStart(stage.id)
877877
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
878878

@@ -937,8 +937,8 @@ class DAGScheduler(
937937
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
938938
stage.pendingTasks ++= tasks
939939
logDebug("New pending tasks: " + stage.pendingTasks)
940-
taskScheduler.submitTasks(
941-
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
940+
taskScheduler.submitTasks(new TaskSet(
941+
tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
942942
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
943943
} else {
944944
// Because we posted SparkListenerStageSubmitted earlier, we should mark

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,22 +62,28 @@ private[spark] abstract class Stage(
6262

6363
var pendingTasks = new HashSet[Task[_]]
6464

65+
/** The ID to use for the next new attempt for this stage. */
6566
private var nextAttemptId: Int = 0
6667

6768
val name = callSite.shortForm
6869
val details = callSite.longForm
6970

70-
/** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
71-
var latestInfo: StageInfo = StageInfo.fromStage(this)
71+
/**
72+
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
73+
* here, before any attempts have actually been created, because the DAGScheduler uses this
74+
* StageInfo to tell SparkListeners when a job starts (which happens before any stage attempts
75+
* have been created).
76+
*/
77+
private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
7278

73-
/** Return a new attempt id, starting with 0. */
74-
def newAttemptId(): Int = {
75-
val id = nextAttemptId
79+
/** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
80+
def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
81+
_latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
7682
nextAttemptId += 1
77-
id
7883
}
7984

80-
def attemptId: Int = nextAttemptId
85+
/** Returns the StageInfo for the most recent attempt for this stage. */
86+
def latestInfo: StageInfo = _latestInfo
8187

8288
override final def hashCode(): Int = id
8389
override final def equals(other: Any): Boolean = other match {

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ private[spark] object StageInfo {
7070
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
7171
* sequence of narrow dependencies should also be associated with this Stage.
7272
*/
73-
def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
73+
def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
7474
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
7575
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
7676
new StageInfo(
7777
stage.id,
78-
stage.attemptId,
78+
attemptId,
7979
stage.name,
8080
numTasks.getOrElse(stage.numTasks),
8181
rddInfos,

0 commit comments

Comments
 (0)