Skip to content

Commit cbe4d76

Browse files
committed
Preserve attemptId behavior and deprecate it:
- Introduce new `attemptNumber` and `taskAttemptId` methods to avoid ambuiguity. - Change `attemptNumber` to return Int instead of Long, since it was being treated as an Int elsewhere. - Add more Javadocs. - Add Mima excludes for new methods.
1 parent b2dffa3 commit cbe4d76

File tree

20 files changed

+75
-49
lines changed

20 files changed

+75
-49
lines changed

core/src/main/java/org/apache/spark/TaskContext.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static void unset() {
6262
*/
6363
public abstract boolean isInterrupted();
6464

65-
/** @deprecated: use isRunningLocally() */
65+
/** @deprecated use {@link #isRunningLocally()} */
6666
@Deprecated
6767
public abstract boolean runningLocally();
6868

@@ -87,28 +87,38 @@ static void unset() {
8787
* is for HadoopRDD to register a callback to close the input stream.
8888
* Will be called in any situation - success, failure, or cancellation.
8989
*
90-
* @deprecated: use addTaskCompletionListener
90+
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
9191
*
9292
* @param f Callback function.
9393
*/
9494
@Deprecated
9595
public abstract void addOnCompleteCallback(final Function0<Unit> f);
9696

97+
/**
98+
* The ID of the stage that this task belong to.
99+
*/
97100
public abstract int stageId();
98101

102+
/**
103+
* The ID of the RDD partition that is computed by this task.
104+
*/
99105
public abstract int partitionId();
100106

101107
/**
102-
* An ID recording how many times this task has been attempted. The first task attempt will be
103-
* assigned attemptId = 0, and subsequent attempts will have increasing IDs.
108+
* How many times this task has been attempted. The first task attempt will be assigned
109+
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
104110
*/
111+
public abstract int attemptNumber();
112+
113+
/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
114+
@Deprecated
105115
public abstract long attemptId();
106116

107117
/**
108118
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
109-
* will share the same task ID).
119+
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
110120
*/
111-
public abstract long taskId();
121+
public abstract long taskAttemptId();
112122

113123
/** ::DeveloperApi:: */
114124
@DeveloperApi

core/src/main/scala/org/apache/spark/TaskContextImpl.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,16 @@ import scala.collection.mutable.ArrayBuffer
2525
private[spark] class TaskContextImpl(
2626
val stageId: Int,
2727
val partitionId: Int,
28-
val taskId: Long,
29-
val attemptId: Long,
28+
override val taskAttemptId: Long,
29+
override val attemptNumber: Int,
3030
val runningLocally: Boolean = false,
3131
val taskMetrics: TaskMetrics = TaskMetrics.empty)
3232
extends TaskContext
3333
with Logging {
3434

35+
// For backwards-compatibility; this method is now deprecated as of 1.3.0.
36+
override def attemptId: Long = taskAttemptId
37+
3538
// List of callback functions to execute when the task completes.
3639
@transient private val onCompleteCallbacks = new ArrayBuffer[TaskCompletionListener]
3740

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private[spark] class CoarseGrainedExecutorBackend(
7171
val ser = env.closureSerializer.newInstance()
7272
val taskDesc = ser.deserialize[TaskDescription](data.value)
7373
logInfo("Got assigned task " + taskDesc.taskId)
74-
executor.launchTask(this, taskId = taskDesc.taskId, attemptId = taskDesc.attemptId,
74+
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
7575
taskDesc.name, taskDesc.serializedTask)
7676
}
7777

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ private[spark] class Executor(
110110
def launchTask(
111111
context: ExecutorBackend,
112112
taskId: Long,
113-
attemptId: Long,
113+
attemptNumber: Int,
114114
taskName: String,
115115
serializedTask: ByteBuffer) {
116-
val tr =
117-
new TaskRunner(context, taskId = taskId, attemptId = attemptId, taskName, serializedTask)
116+
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
117+
serializedTask)
118118
runningTasks.put(taskId, tr)
119119
threadPool.execute(tr)
120120
}
@@ -141,7 +141,7 @@ private[spark] class Executor(
141141
class TaskRunner(
142142
execBackend: ExecutorBackend,
143143
val taskId: Long,
144-
val attemptId: Long,
144+
val attemptNumber: Int,
145145
taskName: String,
146146
serializedTask: ByteBuffer)
147147
extends Runnable {
@@ -189,7 +189,7 @@ private[spark] class Executor(
189189

190190
// Run the actual task and measure its runtime.
191191
taskStart = System.currentTimeMillis()
192-
val value = task.run(taskId = taskId, attemptId = attemptId)
192+
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
193193
val taskFinish = System.currentTimeMillis()
194194

195195
// If the task has been killed, let's fail it.

core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,14 @@ private[spark] class MesosExecutorBackend
7777

7878
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
7979
val taskId = taskInfo.getTaskId.getValue.toLong
80-
val attemptIdPlusData = taskInfo.getData.asReadOnlyByteBuffer()
81-
val attemptId = attemptIdPlusData.getLong // Updates the position by 8 bytes
82-
val data = attemptIdPlusData.slice() // Subsequence starting at the current position
80+
val attemptNumberPlusData = taskInfo.getData.asReadOnlyByteBuffer()
81+
val attemptNumber = attemptNumberPlusData.getInt // Updates the position by 4 bytes
82+
val data = attemptNumberPlusData.slice() // Subsequence starting at the current position
8383
if (executor == null) {
8484
logError("Received launchTask but executor was null")
8585
} else {
86-
executor.launchTask(this, taskId = taskId, attemptId = attemptId, taskInfo.getName, data)
86+
executor.launchTask(this, taskId = taskId, attemptNumber = attemptNumber, taskInfo.getName,
87+
data)
8788
}
8889
}
8990

core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ private[spark] object CheckpointRDD extends Logging {
9595

9696
val finalOutputName = splitIdToFile(ctx.partitionId)
9797
val finalOutputPath = new Path(outputDir, finalOutputName)
98-
val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
98+
val tempOutputPath =
99+
new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptNumber)
99100

100101
if (fs.exists(tempOutputPath)) {
101102
throw new IOException("Checkpoint failed: temporary path " +
@@ -119,7 +120,7 @@ private[spark] object CheckpointRDD extends Logging {
119120
logInfo("Deleting tempOutputPath " + tempOutputPath)
120121
fs.delete(tempOutputPath, false)
121122
throw new IOException("Checkpoint failed: failed to save output of task: "
122-
+ ctx.attemptId + " and final output path does not exist")
123+
+ ctx.attemptNumber + " and final output path does not exist")
123124
} else {
124125
// Some other copy of this task must've finished before us and renamed it
125126
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ class HadoopRDD[K, V](
229229
var reader: RecordReader[K, V] = null
230230
val inputFormat = getInputFormat(jobConf)
231231
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
232-
context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
232+
context.stageId, theSplit.index, context.attemptNumber, jobConf)
233233
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
234234

235235
// Register an on-task-completion callback to close the input stream.

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -971,12 +971,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
971971

972972
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
973973
val config = wrappedConf.value
974-
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
975-
// around by taking a mod. We expect that no task will be attempted 2 billion times.
976-
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
977974
/* "reduce task" <split #> <attempt # = spark task #> */
978975
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
979-
attemptNumber)
976+
context.attemptNumber)
980977
val hadoopContext = newTaskAttemptContext(config, attemptId)
981978
val format = outfmt.newInstance
982979
format match {

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -634,8 +634,8 @@ class DAGScheduler(
634634
try {
635635
val rdd = job.finalStage.rdd
636636
val split = rdd.partitions(job.partitions(0))
637-
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskId = 0,
638-
attemptId = 0, runningLocally = true)
637+
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
638+
attemptNumber = 0, runningLocally = true)
639639
TaskContextHelper.setTaskContext(taskContext)
640640
try {
641641
val result = job.func(taskContext, rdd.iterator(split, taskContext))

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,16 @@ import org.apache.spark.util.Utils
4444
*/
4545
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
4646

47-
final def run(taskId: Long, attemptId: Long): T = {
48-
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId, taskId = taskId,
49-
attemptId = attemptId, runningLocally = false)
47+
/**
48+
* Called by Executor to run this task.
49+
*
50+
* @param taskAttemptId an identifier for this task attempt that is unique within a SparkContext.
51+
* @param attemptNumber how many times this task has been attempted (0 for the first attempt)
52+
* @return the result of the task
53+
*/
54+
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
55+
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
56+
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
5057
TaskContextHelper.setTaskContext(context)
5158
context.taskMetrics.hostname = Utils.localHostName()
5259
taskThread = Thread.currentThread()

0 commit comments

Comments
 (0)