@@ -762,7 +762,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
762762 outputFormatClass : Class [_ <: NewOutputFormat [_, _]],
763763 conf : Configuration = self.context.hadoopConfiguration)
764764 {
765- val job = new NewAPIHadoopJob (conf)
765+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
766+ val hadoopConf = conf
767+ val job = new NewAPIHadoopJob (hadoopConf)
766768 job.setOutputKeyClass(keyClass)
767769 job.setOutputValueClass(valueClass)
768770 job.setOutputFormatClass(outputFormatClass)
@@ -795,22 +797,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
795797 outputFormatClass : Class [_ <: OutputFormat [_, _]],
796798 conf : JobConf = new JobConf (self.context.hadoopConfiguration),
797799 codec : Option [Class [_ <: CompressionCodec ]] = None ) {
798- conf.setOutputKeyClass(keyClass)
799- conf.setOutputValueClass(valueClass)
800+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
801+ val hadoopConf = conf
802+ hadoopConf.setOutputKeyClass(keyClass)
803+ hadoopConf.setOutputValueClass(valueClass)
800804 // Doesn't work in Scala 2.9 due to what may be a generics bug
801805 // TODO: Should we uncomment this for Scala 2.10?
802806 // conf.setOutputFormat(outputFormatClass)
803- conf .set(" mapred.output.format.class" , outputFormatClass.getName)
807+ hadoopConf .set(" mapred.output.format.class" , outputFormatClass.getName)
804808 for (c <- codec) {
805- conf .setCompressMapOutput(true )
806- conf .set(" mapred.output.compress" , " true" )
807- conf .setMapOutputCompressorClass(c)
808- conf .set(" mapred.output.compression.codec" , c.getCanonicalName)
809- conf .set(" mapred.output.compression.type" , CompressionType .BLOCK .toString)
809+ hadoopConf .setCompressMapOutput(true )
810+ hadoopConf .set(" mapred.output.compress" , " true" )
811+ hadoopConf .setMapOutputCompressorClass(c)
812+ hadoopConf .set(" mapred.output.compression.codec" , c.getCanonicalName)
813+ hadoopConf .set(" mapred.output.compression.type" , CompressionType .BLOCK .toString)
810814 }
811- conf.setOutputCommitter(classOf [FileOutputCommitter ])
812- FileOutputFormat .setOutputPath(conf, SparkHadoopWriter .createPathFromString(path, conf))
813- saveAsHadoopDataset(conf)
815+ hadoopConf.setOutputCommitter(classOf [FileOutputCommitter ])
816+ FileOutputFormat .setOutputPath(hadoopConf,
817+ SparkHadoopWriter .createPathFromString(path, hadoopConf))
818+ saveAsHadoopDataset(hadoopConf)
814819 }
815820
816821 /**
@@ -820,7 +825,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
820825 * configured for a Hadoop MapReduce job.
821826 */
822827 def saveAsNewAPIHadoopDataset (conf : Configuration ) {
823- val job = new NewAPIHadoopJob (conf)
828+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
829+ val hadoopConf = conf
830+ val job = new NewAPIHadoopJob (hadoopConf)
824831 val formatter = new SimpleDateFormat (" yyyyMMddHHmm" )
825832 val jobtrackerID = formatter.format(new Date ())
826833 val stageId = self.id
@@ -877,9 +884,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
877884 * MapReduce job.
878885 */
879886 def saveAsHadoopDataset (conf : JobConf ) {
880- val outputFormatInstance = conf.getOutputFormat
881- val keyClass = conf.getOutputKeyClass
882- val valueClass = conf.getOutputValueClass
887+ // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
888+ val hadoopConf = conf
889+ val outputFormatInstance = hadoopConf.getOutputFormat
890+ val keyClass = hadoopConf.getOutputKeyClass
891+ val valueClass = hadoopConf.getOutputValueClass
883892 if (outputFormatInstance == null ) {
884893 throw new SparkException (" Output format class not set" )
885894 }
@@ -889,18 +898,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
889898 if (valueClass == null ) {
890899 throw new SparkException (" Output value class not set" )
891900 }
892- SparkHadoopUtil .get.addCredentials(conf )
901+ SparkHadoopUtil .get.addCredentials(hadoopConf )
893902
894903 logDebug(" Saving as hadoop file of type (" + keyClass.getSimpleName + " , " +
895904 valueClass.getSimpleName + " )" )
896905
897906 if (self.conf.getBoolean(" spark.hadoop.validateOutputSpecs" , true )) {
898907 // FileOutputFormat ignores the filesystem parameter
899- val ignoredFs = FileSystem .get(conf )
900- conf .getOutputFormat.checkOutputSpecs(ignoredFs, conf )
908+ val ignoredFs = FileSystem .get(hadoopConf )
909+ hadoopConf .getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf )
901910 }
902911
903- val writer = new SparkHadoopWriter (conf )
912+ val writer = new SparkHadoopWriter (hadoopConf )
904913 writer.preSetup()
905914
906915 def writeToFile (context : TaskContext , iter : Iterator [(K , V )]) {
0 commit comments