Skip to content

Commit 06fd842

Browse files
advancedxycloud-fan
authored andcommitted
[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber
## What changes were proposed in this pull request? 1. Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId` 2. Replace usage of stageAttemptId with stageAttemptNumber ## How was this patch tested? I manually checked the compiler warning info Author: Xianjin YE <[email protected]> Closes #20178 from advancedxy/SPARK-22952. (cherry picked from commit 40b983c) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 4a45f0a commit 06fd842

File tree

9 files changed

+51
-41
lines changed

9 files changed

+51
-41
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,8 @@ class DAGScheduler(
815815
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
816816
// Note that there is a chance that this task is launched after the stage is cancelled.
817817
// In that case, we wouldn't have the stage anymore in stageIdToStage.
818-
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
818+
val stageAttemptId =
819+
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
819820
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
820821
}
821822

@@ -1050,7 +1051,7 @@ class DAGScheduler(
10501051
val locs = taskIdToLocations(id)
10511052
val part = stage.rdd.partitions(id)
10521053
stage.pendingPartitions += id
1053-
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
1054+
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
10541055
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
10551056
Option(sc.applicationId), sc.applicationAttemptId)
10561057
}
@@ -1060,7 +1061,7 @@ class DAGScheduler(
10601061
val p: Int = stage.partitions(id)
10611062
val part = stage.rdd.partitions(p)
10621063
val locs = taskIdToLocations(id)
1063-
new ResultTask(stage.id, stage.latestInfo.attemptId,
1064+
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
10641065
taskBinary, part, locs, id, properties, serializedTaskMetrics,
10651066
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
10661067
}
@@ -1076,7 +1077,7 @@ class DAGScheduler(
10761077
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
10771078
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
10781079
taskScheduler.submitTasks(new TaskSet(
1079-
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
1080+
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
10801081
} else {
10811082
// Because we posted SparkListenerStageSubmitted earlier, we should mark
10821083
// the stage as completed here in case there are no tasks to run
@@ -1245,7 +1246,7 @@ class DAGScheduler(
12451246
val status = event.result.asInstanceOf[MapStatus]
12461247
val execId = status.location.executorId
12471248
logDebug("ShuffleMapTask finished on " + execId)
1248-
if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
1249+
if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
12491250
// This task was for the currently running attempt of the stage. Since the task
12501251
// completed successfully from the perspective of the TaskSetManager, mark it as
12511252
// no longer pending (the TaskSetManager may consider the task complete even
@@ -1324,10 +1325,10 @@ class DAGScheduler(
13241325
val failedStage = stageIdToStage(task.stageId)
13251326
val mapStage = shuffleIdToMapStage(shuffleId)
13261327

1327-
if (failedStage.latestInfo.attemptId != task.stageAttemptId) {
1328+
if (failedStage.latestInfo.attemptNumber != task.stageAttemptId) {
13281329
logInfo(s"Ignoring fetch failure from $task as it's from $failedStage attempt" +
13291330
s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
1330-
s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
1331+
s"(attempt ${failedStage.latestInfo.attemptNumber}) running")
13311332
} else {
13321333
// It is likely that we receive multiple FetchFailed for a single stage (because we have
13331334
// multiple tasks running concurrently on different executors). In that case, it is

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
3030
@DeveloperApi
3131
class StageInfo(
3232
val stageId: Int,
33-
val attemptId: Int,
33+
@deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
3434
val name: String,
3535
val numTasks: Int,
3636
val rddInfos: Seq[RDDInfo],
@@ -56,6 +56,8 @@ class StageInfo(
5656
completionTime = Some(System.currentTimeMillis)
5757
}
5858

59+
def attemptNumber(): Int = attemptId
60+
5961
private[spark] def getStatusString: String = {
6062
if (completionTime.isDefined) {
6163
if (failureReason.isDefined) {

core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class StatsReportListener extends SparkListener with Logging {
7979
x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
8080
).getOrElse("-")
8181

82-
s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
82+
s"Stage(${info.stageId}, ${info.attemptNumber}); Name: '${info.name}'; " +
8383
s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
8484
s"Took: $timeTaken msec"
8585
}

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,8 @@ private[spark] class AppStatusListener(
529529
}
530530

531531
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
532-
val maybeStage = Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)))
532+
val maybeStage =
533+
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
533534
maybeStage.foreach { stage =>
534535
val now = System.nanoTime()
535536
stage.info = event.stageInfo
@@ -785,7 +786,7 @@ private[spark] class AppStatusListener(
785786
}
786787

787788
private def getOrCreateStage(info: StageInfo): LiveStage = {
788-
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptId),
789+
val stage = liveStages.computeIfAbsent((info.stageId, info.attemptNumber),
789790
new Function[(Int, Int), LiveStage]() {
790791
override def apply(key: (Int, Int)): LiveStage = new LiveStage()
791792
})
@@ -912,7 +913,7 @@ private[spark] class AppStatusListener(
912913
private def cleanupTasks(stage: LiveStage): Unit = {
913914
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
914915
if (countToDelete > 0) {
915-
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
916+
val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
916917
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
917918
.last(stageKey)
918919

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,14 +412,14 @@ private class LiveStage extends LiveEntity {
412412

413413
def executorSummary(executorId: String): LiveExecutorStageSummary = {
414414
executorSummaries.getOrElseUpdate(executorId,
415-
new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
415+
new LiveExecutorStageSummary(info.stageId, info.attemptNumber, executorId))
416416
}
417417

418418
def toApi(): v1.StageData = {
419419
new v1.StageData(
420420
status,
421421
info.stageId,
422-
info.attemptId,
422+
info.attemptNumber,
423423

424424
info.numTasks,
425425
activeTasks,

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private[spark] object RDDOperationGraph extends Logging {
116116
// Use a special prefix here to differentiate this cluster from other operation clusters
117117
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
118118
val stageClusterName = s"Stage ${stage.stageId}" +
119-
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
119+
{ if (stage.attemptNumber == 0) "" else s" (attempt ${stage.attemptNumber})" }
120120
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
121121

122122
var rootNodeCount = 0

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private[spark] object JsonProtocol {
263263
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
264264
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
265265
("Stage ID" -> stageInfo.stageId) ~
266-
("Stage Attempt ID" -> stageInfo.attemptId) ~
266+
("Stage Attempt ID" -> stageInfo.attemptNumber) ~
267267
("Stage Name" -> stageInfo.name) ~
268268
("Number of Tasks" -> stageInfo.numTasks) ~
269269
("RDD Info" -> rddInfo) ~

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
195195

196196
val s1Tasks = createTasks(4, execIds)
197197
s1Tasks.foreach { task =>
198-
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task))
198+
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId,
199+
stages.head.attemptNumber,
200+
task))
199201
}
200202

201203
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
@@ -213,10 +215,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
213215
check[TaskDataWrapper](task.taskId) { wrapper =>
214216
assert(wrapper.info.taskId === task.taskId)
215217
assert(wrapper.stageId === stages.head.stageId)
216-
assert(wrapper.stageAttemptId === stages.head.attemptId)
217-
assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId)))
218+
assert(wrapper.stageAttemptId === stages.head.attemptNumber)
219+
assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptNumber)))
218220

219-
val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger,
221+
val runtime = Array[AnyRef](stages.head.stageId: JInteger,
222+
stages.head.attemptNumber: JInteger,
220223
-1L: JLong)
221224
assert(Arrays.equals(wrapper.runtime, runtime))
222225

@@ -237,7 +240,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
237240
Some(1L), None, true, false, None)
238241
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
239242
task.executorId,
240-
Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum)))))
243+
Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, Seq(accum)))))
241244
}
242245

243246
check[StageDataWrapper](key(stages.head)) { stage =>
@@ -254,12 +257,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
254257
// Fail one of the tasks, re-start it.
255258
time += 1
256259
s1Tasks.head.markFinished(TaskState.FAILED, time)
257-
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
260+
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
258261
"taskType", TaskResultLost, s1Tasks.head, null))
259262

260263
time += 1
261264
val reattempt = newAttempt(s1Tasks.head, nextTaskId())
262-
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
265+
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
263266
reattempt))
264267

265268
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
@@ -289,7 +292,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
289292
val killed = s1Tasks.drop(1).head
290293
killed.finishTime = time
291294
killed.failed = true
292-
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
295+
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
293296
"taskType", TaskKilled("killed"), killed, null))
294297

295298
check[JobDataWrapper](1) { job =>
@@ -311,13 +314,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
311314
time += 1
312315
val denied = newAttempt(killed, nextTaskId())
313316
val denyReason = TaskCommitDenied(1, 1, 1)
314-
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
317+
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
315318
denied))
316319

317320
time += 1
318321
denied.finishTime = time
319322
denied.failed = true
320-
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
323+
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
321324
"taskType", denyReason, denied, null))
322325

323326
check[JobDataWrapper](1) { job =>
@@ -337,7 +340,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
337340

338341
// Start a new attempt.
339342
val reattempt2 = newAttempt(denied, nextTaskId())
340-
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
343+
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
341344
reattempt2))
342345

343346
// Succeed all tasks in stage 1.
@@ -350,7 +353,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
350353
time += 1
351354
pending.foreach { task =>
352355
task.markFinished(TaskState.FINISHED, time)
353-
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
356+
listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
354357
"taskType", Success, task, s1Metrics))
355358
}
356359

@@ -414,13 +417,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
414417
time += 1
415418
val s2Tasks = createTasks(4, execIds)
416419
s2Tasks.foreach { task =>
417-
listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task))
420+
listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId,
421+
stages.last.attemptNumber,
422+
task))
418423
}
419424

420425
time += 1
421426
s2Tasks.foreach { task =>
422427
task.markFinished(TaskState.FAILED, time)
423-
listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId,
428+
listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber,
424429
"taskType", TaskResultLost, task, null))
425430
}
426431

@@ -455,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
455460

456461
// - Re-submit stage 2, all tasks, and succeed them and the stage.
457462
val oldS2 = stages.last
458-
val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks,
463+
val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks,
459464
oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
460465

461466
time += 1
@@ -466,14 +471,14 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
466471
val newS2Tasks = createTasks(4, execIds)
467472

468473
newS2Tasks.foreach { task =>
469-
listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task))
474+
listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task))
470475
}
471476

472477
time += 1
473478
newS2Tasks.foreach { task =>
474479
task.markFinished(TaskState.FINISHED, time)
475-
listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success,
476-
task, null))
480+
listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType",
481+
Success, task, null))
477482
}
478483

479484
time += 1
@@ -522,14 +527,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
522527
val j2s2Tasks = createTasks(4, execIds)
523528

524529
j2s2Tasks.foreach { task =>
525-
listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId,
530+
listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId,
531+
j2Stages.last.attemptNumber,
526532
task))
527533
}
528534

529535
time += 1
530536
j2s2Tasks.foreach { task =>
531537
task.markFinished(TaskState.FINISHED, time)
532-
listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptId,
538+
listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptNumber,
533539
"taskType", Success, task, null))
534540
}
535541

@@ -919,13 +925,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
919925
time += 1
920926
val tasks = createTasks(2, Array("1"))
921927
tasks.foreach { task =>
922-
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
928+
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
923929
}
924930
assert(store.count(classOf[TaskDataWrapper]) === 2)
925931

926932
// Start a 3rd task. The finished tasks should be deleted.
927933
createTasks(1, Array("1")).foreach { task =>
928-
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
934+
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
929935
}
930936
assert(store.count(classOf[TaskDataWrapper]) === 2)
931937
intercept[NoSuchElementException] {
@@ -934,7 +940,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
934940

935941
// Start a 4th task. The first task should be deleted, even if it's still running.
936942
createTasks(1, Array("1")).foreach { task =>
937-
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
943+
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
938944
}
939945
assert(store.count(classOf[TaskDataWrapper]) === 2)
940946
intercept[NoSuchElementException] {
@@ -960,7 +966,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
960966
}
961967
}
962968

963-
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
969+
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
964970

965971
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
966972
val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T]

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class SQLAppStatusListener(
9999
// Reset the metrics tracking object for the new attempt.
100100
Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
101101
metrics.taskMetrics.clear()
102-
metrics.attemptId = event.stageInfo.attemptId
102+
metrics.attemptId = event.stageInfo.attemptNumber
103103
}
104104
}
105105

0 commit comments

Comments
 (0)