@@ -1018,6 +1018,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10181018 /**
10191019 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
10201020 * supporting the key and value types K and V in this RDD.
1021+ *
1022+ * Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
1023+ * not use output committer that writes data directly.
1024+ * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
1025+ * result of using direct output committer with speculation enabled.
10211026 */
10221027 def saveAsHadoopFile (
10231028 path : String ,
@@ -1030,10 +1035,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10301035 val hadoopConf = conf
10311036 hadoopConf.setOutputKeyClass(keyClass)
10321037 hadoopConf.setOutputValueClass(valueClass)
1033- // Doesn't work in Scala 2.9 due to what may be a generics bug
1034- // TODO: Should we uncomment this for Scala 2.10?
1035- // conf.setOutputFormat(outputFormatClass)
1036- hadoopConf.set(" mapred.output.format.class" , outputFormatClass.getName)
1038+ conf.setOutputFormat(outputFormatClass)
10371039 for (c <- codec) {
10381040 hadoopConf.setCompressMapOutput(true )
10391041 hadoopConf.set(" mapred.output.compress" , " true" )
@@ -1047,6 +1049,19 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10471049 hadoopConf.setOutputCommitter(classOf [FileOutputCommitter ])
10481050 }
10491051
1052+ // When speculation is on and output committer class name contains "Direct", we should warn
1053+ // users that they may loss data if they are using a direct output committer.
1054+ val speculationEnabled = self.conf.getBoolean(" spark.speculation" , false )
1055+ val outputCommitterClass = hadoopConf.get(" mapred.output.committer.class" , " " )
1056+ if (speculationEnabled && outputCommitterClass.contains(" Direct" )) {
1057+ val warningMessage =
1058+ s " $outputCommitterClass may be an output committer that writes data directly to " +
1059+ " the final location. Because speculation is enabled, this output committer may " +
1060+ " cause data loss (see the case in SPARK-10063). If possible, please use a output " +
1061+ " committer that does not have this behavior (e.g. FileOutputCommitter)."
1062+ logWarning(warningMessage)
1063+ }
1064+
10501065 FileOutputFormat .setOutputPath(hadoopConf,
10511066 SparkHadoopWriter .createPathFromString(path, hadoopConf))
10521067 saveAsHadoopDataset(hadoopConf)
@@ -1057,6 +1072,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
10571072 * Configuration object for that storage system. The Conf should set an OutputFormat and any
10581073 * output paths required (e.g. a table name to write to) in the same way as it would be
10591074 * configured for a Hadoop MapReduce job.
1075+ *
1076+ * Note that, we should make sure our tasks are idempotent when speculation is enabled, i.e. do
1077+ * not use output committer that writes data directly.
1078+ * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
1079+ * result of using direct output committer with speculation enabled.
10601080 */
10611081 def saveAsNewAPIHadoopDataset (conf : Configuration ): Unit = self.withScope {
10621082 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
@@ -1115,6 +1135,20 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11151135 val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true , 0 , 0 )
11161136 val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
11171137 val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
1138+
1139+ // When speculation is on and output committer class name contains "Direct", we should warn
1140+ // users that they may loss data if they are using a direct output committer.
1141+ val speculationEnabled = self.conf.getBoolean(" spark.speculation" , false )
1142+ val outputCommitterClass = jobCommitter.getClass.getSimpleName
1143+ if (speculationEnabled && outputCommitterClass.contains(" Direct" )) {
1144+ val warningMessage =
1145+ s " $outputCommitterClass may be an output committer that writes data directly to " +
1146+ " the final location. Because speculation is enabled, this output committer may " +
1147+ " cause data loss (see the case in SPARK-10063). If possible, please use a output " +
1148+ " committer that does not have this behavior (e.g. FileOutputCommitter)."
1149+ logWarning(warningMessage)
1150+ }
1151+
11181152 jobCommitter.setupJob(jobTaskContext)
11191153 self.context.runJob(self, writeShard)
11201154 jobCommitter.commitJob(jobTaskContext)
@@ -1129,7 +1163,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11291163 def saveAsHadoopDataset (conf : JobConf ): Unit = self.withScope {
11301164 // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
11311165 val hadoopConf = conf
1132- val wrappedConf = new SerializableConfiguration (hadoopConf)
11331166 val outputFormatInstance = hadoopConf.getOutputFormat
11341167 val keyClass = hadoopConf.getOutputKeyClass
11351168 val valueClass = hadoopConf.getOutputValueClass
@@ -1157,7 +1190,6 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
11571190 writer.preSetup()
11581191
11591192 val writeToFile = (context : TaskContext , iter : Iterator [(K , V )]) => {
1160- val config = wrappedConf.value
11611193 // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
11621194 // around by taking a mod. We expect that no task will be attempted 2 billion times.
11631195 val taskAttemptId = (context.taskAttemptId % Int .MaxValue ).toInt
0 commit comments