Skip to content

Commit 6d68b4f

Browse files
committed
bugfix - should update JobConf on JobContext/TaskContext creation.
1 parent 4dffc34 commit 6d68b4f

File tree

1 file changed

+7
-1
lines changed

1 file changed

+7
-1
lines changed

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
3333
import org.apache.spark.executor.OutputMetrics
3434
import org.apache.spark.internal.Logging
3535
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
36-
import org.apache.spark.rdd.RDD
36+
import org.apache.spark.rdd.{HadoopRDD, RDD}
3737
import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils}
3838

3939
/**
@@ -197,6 +197,9 @@ class SparkHadoopMapRedWriterConfig[K, V: ClassTag](conf: SerializableJobConf)
197197
jobId: Int,
198198
splitId: Int,
199199
taskAttemptId: Int): NewTaskAttemptContext = {
200+
// Update JobConf.
201+
HadoopRDD.addLocalConfiguration(jobTrackerId, jobId, splitId, taskAttemptId, conf.value)
202+
// Create taskContext.
200203
val attemptId = new TaskAttemptID(jobTrackerId, jobId, TaskType.MAP, splitId, taskAttemptId)
201204
new TaskAttemptContextImpl(getConf(), attemptId)
202205
}
@@ -206,6 +209,9 @@ class SparkHadoopMapRedWriterConfig[K, V: ClassTag](conf: SerializableJobConf)
206209
// --------------------------------------------------------------------------
207210

208211
def createCommitter(jobId: Int): HadoopMapReduceCommitProtocol = {
212+
// Update JobConf.
213+
HadoopRDD.addLocalConfiguration("", 0, 0, 0, getConf())
214+
// Create commit protocol.
209215
FileCommitProtocol.instantiate(
210216
className = classOf[HadoopMapRedCommitProtocol].getName,
211217
jobId = jobId.toString,

0 commit comments

Comments
 (0)