From 51d1fbe464fa49f4503435f0f10790d8f1ec35ad Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 9 Nov 2016 11:15:38 +0800 Subject: [PATCH 1/2] If outputFormat is Configurable, we should call setConf to it. --- .../io/HadoopMapReduceCommitProtocol.scala | 9 ++++++++- .../internal/io/SparkHadoopMapReduceWriter.scala | 7 ++++++- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 15 +++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index d643a32af0314..5007760b11248 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -19,6 +19,7 @@ package org.apache.spark.internal.io import java.util.Date +import org.apache.hadoop.conf.Configurable import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter @@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) @transient private var committer: OutputCommitter = _ protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { - context.getOutputFormatClass.newInstance().getOutputCommitter(context) + val format = context.getOutputFormatClass.newInstance + // If OutputFormat is Configurable, we should set conf to it. + format match { + case c: Configurable => c.setConf(context.getConfiguration) + case _ => () + } + format.getOutputCommitter(context) } override def newTaskTempFile( diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index a405c44e1093d..72914b4849af9 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -23,7 +23,7 @@ import java.util.{Date, Locale} import scala.reflect.ClassTag import scala.util.DynamicVariable -import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.{JobConf, JobID} import org.apache.hadoop.mapreduce._ @@ -141,6 +141,11 @@ object SparkHadoopMapReduceWriter extends Logging { // Initiate the writer. val taskFormat = outputFormat.newInstance + // If OutputFormat is Configurable, we should set conf to it. + taskFormat match { + case c: Configurable => c.setConf(hadoopConf) + case _ => () + } val writer = taskFormat.getRecordWriter(taskContext) .asInstanceOf[RecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index fe547d4d9163e..02df157be377c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { (2, ArrayBuffer(1)))) } + test("saveNewAPIHadoopFile should call setConf if format is configurable") { + val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) + + // No error, non-configurable formats still work + pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") + + /* + * Check that configurable formats get configured: + * ConfigTestFormat throws an exception if we try to write + * to it when setConf hasn't been called first. + * Assertion is in ConfigTestFormat.getRecordWriter. + */ + pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") + } + test("saveAsHadoopFile should respect configured output committers") { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val conf = new JobConf() From 4e79c377f99602adb5c3fd82f57de1773bb9b64d Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 9 Nov 2016 13:16:48 +0800 Subject: [PATCH 2/2] newInstance -> newInstance() --- .../spark/internal/io/HadoopMapReduceCommitProtocol.scala | 2 +- .../apache/spark/internal/io/SparkHadoopMapReduceWriter.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 5007760b11248..6b0bcb8f908b8 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -43,7 +43,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String) @transient private var committer: OutputCommitter = _ protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = { - val format = context.getOutputFormatClass.newInstance + val format = context.getOutputFormatClass.newInstance() // If OutputFormat is Configurable, we should set conf to it. format match { case c: Configurable => c.setConf(context.getConfiguration) diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala index 72914b4849af9..796439276a22e 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopMapReduceWriter.scala @@ -140,7 +140,7 @@ object SparkHadoopMapReduceWriter extends Logging { SparkHadoopWriterUtils.initHadoopOutputMetrics(context) // Initiate the writer. - val taskFormat = outputFormat.newInstance + val taskFormat = outputFormat.newInstance() // If OutputFormat is Configurable, we should set conf to it. taskFormat match { case c: Configurable => c.setConf(hadoopConf)