Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.internal.io

import java.util.Date

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
Expand All @@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var committer: OutputCommitter = _

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
format match {
case c: Configurable => c.setConf(context.getConfiguration)
case _ => ()
}
format.getOutputCommitter(context)
}

override def newTaskTempFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Date, Locale}
import scala.reflect.ClassTag
import scala.util.DynamicVariable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, JobID}
import org.apache.hadoop.mapreduce._
Expand Down Expand Up @@ -140,7 +140,12 @@ object SparkHadoopMapReduceWriter extends Logging {
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

// Initiate the writer.
val taskFormat = outputFormat.newInstance
val taskFormat = outputFormat.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
taskFormat match {
case c: Configurable => c.setConf(hadoopConf)
case _ => ()
}
val writer = taskFormat.getRecordWriter(taskContext)
.asInstanceOf[RecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(2, ArrayBuffer(1))))
}

test("saveNewAPIHadoopFile should call setConf if format is configurable") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))

// No error, non-configurable formats still work
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")

/*
* Check that configurable formats get configured:
* ConfigTestFormat throws an exception if we try to write
* to it when setConf hasn't been called first.
* Assertion is in ConfigTestFormat.getRecordWriter.
*/
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}

test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
Expand Down