From 756486744564709f9e15b63cde0600567987a522 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 20 Aug 2015 02:32:00 +0800 Subject: [PATCH] Disables customized output committer when speculation is on --- .../datasources/WriterContainer.scala | 16 ++++++++- .../sql/sources/hadoopFsRelationSuites.scala | 34 +++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index e0147079e699..78f48a5cd72c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer( // This is only used on driver side. @transient private val jobContext: JobContext = job + private val speculationEnabled: Boolean = + relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + // The following fields are initialized and used on both driver and executor side. @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _ @@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer( // associated with the file output format since it is not safe to use a custom // committer for appending. For example, in S3, direct parquet output committer may // leave partial data in the destination dir when the the appending job fails. + // + // See SPARK-8578 for more details logInfo( - s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " + + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + "for appending.") defaultOutputCommitter + } else if (speculationEnabled) { + // When speculation is enabled, it's not safe to use customized output committer classes, + // especially direct output committers (e.g. `DirectParquetOutputCommitter`). + // + // See SPARK-9899 for more details. + logInfo( + s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " + + "because spark.speculation is configured to be true.") + defaultOutputCommitter } else { val committerClass = context.getConfiguration.getClass( SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 8d0d9218ddd6..5bbca14bad32 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils { df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t") } } + + test("SPARK-9899 Disable customized output committer when speculation is on") { + val clonedConf = new Configuration(configuration) + val speculationEnabled = + sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + + try { + withTempPath { dir => + // Enables task speculation + sqlContext.sparkContext.conf.set("spark.speculation", "true") + + // Uses a customized output committer which always fails + configuration.set( + SQLConf.OUTPUT_COMMITTER_CLASS.key, + classOf[AlwaysFailOutputCommitter].getName) + + // Code below shouldn't throw since customized output committer should be disabled. + val df = sqlContext.range(10).coalesce(1) + df.write.format(dataSourceName).save(dir.getCanonicalPath) + checkAnswer( + sqlContext + .read + .format(dataSourceName) + .option("dataSchema", df.schema.json) + .load(dir.getCanonicalPath), + df) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString) + } + } } // This class is used to test SPARK-8578. We should not use any custom output committer when