Skip to content

Commit f81401e

Browse files
Reza SafiMarcelo Vanzin
authored andcommitted
[SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol
I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza Safi <[email protected]> Closes #19848 from rezasafi/stagerddsimple.
1 parent 3927bb9 commit f81401e

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging {
6060
config: HadoopWriteConfigUtil[K, V]): Unit = {
6161
// Extract context and configuration from RDD.
6262
val sparkContext = rdd.context
63-
val stageId = rdd.id
63+
val commitJobId = rdd.id
6464

6565
// Set up a job.
6666
val jobTrackerId = createJobTrackerID(new Date())
67-
val jobContext = config.createJobContext(jobTrackerId, stageId)
67+
val jobContext = config.createJobContext(jobTrackerId, commitJobId)
6868
config.initOutputFormat(jobContext)
6969

7070
// Assert the output format/key/value class is set in JobConf.
7171
config.assertConf(jobContext, rdd.conf)
7272

73-
val committer = config.createCommitter(stageId)
73+
val committer = config.createCommitter(commitJobId)
7474
committer.setupJob(jobContext)
7575

7676
// Try to write all RDD partitions as a Hadoop OutputFormat.
@@ -80,7 +80,7 @@ object SparkHadoopWriter extends Logging {
8080
context = context,
8181
config = config,
8282
jobTrackerId = jobTrackerId,
83-
sparkStageId = context.stageId,
83+
commitJobId = commitJobId,
8484
sparkPartitionId = context.partitionId,
8585
sparkAttemptNumber = context.attemptNumber,
8686
committer = committer,
@@ -102,14 +102,14 @@ object SparkHadoopWriter extends Logging {
102102
context: TaskContext,
103103
config: HadoopWriteConfigUtil[K, V],
104104
jobTrackerId: String,
105-
sparkStageId: Int,
105+
commitJobId: Int,
106106
sparkPartitionId: Int,
107107
sparkAttemptNumber: Int,
108108
committer: FileCommitProtocol,
109109
iterator: Iterator[(K, V)]): TaskCommitMessage = {
110110
// Set up a task.
111111
val taskContext = config.createTaskAttemptContext(
112-
jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
112+
jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
113113
committer.setupTask(taskContext)
114114

115115
val (outputMetrics, callback) = initHadoopOutputMetrics(context)

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging {
7070
if (shouldCoordinateWithDriver) {
7171
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
7272
val taskAttemptNumber = TaskContext.get().attemptNumber()
73-
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber)
73+
val stageId = TaskContext.get().stageId()
74+
val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber)
7475

7576
if (canCommit) {
7677
performCommit()
@@ -80,7 +81,7 @@ object SparkHadoopMapRedUtil extends Logging {
8081
logInfo(message)
8182
// We need to abort the task so that the driver can reschedule new attempts, if necessary
8283
committer.abortTask(mrTaskContext)
83-
throw new CommitDeniedException(message, jobId, splitId, taskAttemptNumber)
84+
throw new CommitDeniedException(message, stageId, splitId, taskAttemptNumber)
8485
}
8586
} else {
8687
// Speculation is disabled or a user has chosen to manually bypass the commit coordination

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
3030
OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
3131
RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
3232
import org.apache.hadoop.util.Progressable
33+
import org.scalatest.Assertions
3334

3435
import org.apache.spark._
3536
import org.apache.spark.Partitioner
@@ -524,6 +525,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
524525
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
525526
}
526527

528+
test("The JobId on the driver and executors should be the same during the commit") {
529+
// Create more than one rdd to mimic stageId not equal to rddId
530+
val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2)
531+
.map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }
532+
.filter { p => p._1 > 0 }
533+
pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored")
534+
assert(JobID.jobid != -1)
535+
}
536+
527537
test("saveAsHadoopFile should respect configured output committers") {
528538
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
529539
val conf = new JobConf()
@@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat {
908918
}
909919
}
910920

921+
class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions {
922+
def setupJob(j: NewJobContext): Unit = {
923+
JobID.jobid = j.getJobID().getId
924+
}
925+
926+
def needsTaskCommit(t: NewTaskAttempContext): Boolean = false
927+
928+
def setupTask(t: NewTaskAttempContext): Unit = {
929+
val jobId = t.getTaskAttemptID().getJobID().getId
930+
assert(jobId === JobID.jobid)
931+
}
932+
933+
def commitTask(t: NewTaskAttempContext): Unit = {}
934+
935+
def abortTask(t: NewTaskAttempContext): Unit = {}
936+
}
937+
938+
class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() {
939+
940+
def checkOutputSpecs(j: NewJobContext): Unit = {}
941+
942+
def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
943+
new NewFakeWriter()
944+
}
945+
946+
def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = {
947+
new YetAnotherFakeCommitter()
948+
}
949+
}
950+
951+
object JobID {
952+
var jobid = -1
953+
}
954+
911955
class ConfigTestFormat() extends NewFakeFormat() with Configurable {
912956

913957
var setConfCalled = false

0 commit comments

Comments
 (0)