Skip to content

Commit 64fbdf1

Browse files
jiangxb1987rxin
authored andcommitted
[SPARK-18191][CORE][FOLLOWUP] Call setConf if OutputFormat is Configurable.
## What changes were proposed in this pull request? We should call `setConf` if `OutputFormat` is `Configurable`, this should be done before we create `OutputCommitter` and `RecordWriter`. This is follow up of #15769, see discussion [here](https://github.com/apache/spark/pull/15769/files#r87064229) ## How was this patch tested? Add test of this case in `PairRDDFunctionsSuite`. Author: jiangxingbo <[email protected]> Closes #15823 from jiangxb1987/config-format.
1 parent d8b81f7 commit 64fbdf1

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.internal.io
1919

2020
import java.util.Date
2121

22+
import org.apache.hadoop.conf.Configurable
2223
import org.apache.hadoop.fs.Path
2324
import org.apache.hadoop.mapreduce._
2425
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
@@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
4243
@transient private var committer: OutputCommitter = _
4344

4445
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
45-
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
46+
val format = context.getOutputFormatClass.newInstance()
47+
// If OutputFormat is Configurable, we should set conf to it.
48+
format match {
49+
case c: Configurable => c.setConf(context.getConfiguration)
50+
case _ => ()
51+
}
52+
format.getOutputCommitter(context)
4653
}
4754

4855
override def newTaskTempFile(

core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.{Date, Locale}
2323
import scala.reflect.ClassTag
2424
import scala.util.DynamicVariable
2525

26-
import org.apache.hadoop.conf.Configuration
26+
import org.apache.hadoop.conf.{Configurable, Configuration}
2727
import org.apache.hadoop.fs.Path
2828
import org.apache.hadoop.mapred.{JobConf, JobID}
2929
import org.apache.hadoop.mapreduce._
@@ -140,7 +140,12 @@ object SparkHadoopMapReduceWriter extends Logging {
140140
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
141141

142142
// Initiate the writer.
143-
val taskFormat = outputFormat.newInstance
143+
val taskFormat = outputFormat.newInstance()
144+
// If OutputFormat is Configurable, we should set conf to it.
145+
taskFormat match {
146+
case c: Configurable => c.setConf(hadoopConf)
147+
case _ => ()
148+
}
144149
val writer = taskFormat.getRecordWriter(taskContext)
145150
.asInstanceOf[RecordWriter[K, V]]
146151
require(writer != null, "Unable to obtain RecordWriter")

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
509509
(2, ArrayBuffer(1))))
510510
}
511511

512+
test("saveNewAPIHadoopFile should call setConf if format is configurable") {
513+
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
514+
515+
// No error, non-configurable formats still work
516+
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
517+
518+
/*
519+
* Check that configurable formats get configured:
520+
* ConfigTestFormat throws an exception if we try to write
521+
* to it when setConf hasn't been called first.
522+
* Assertion is in ConfigTestFormat.getRecordWriter.
523+
*/
524+
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
525+
}
526+
512527
test("saveAsHadoopFile should respect configured output committers") {
513528
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
514529
val conf = new JobConf()

0 commit comments

Comments
 (0)