Skip to content

Commit b32a31d

Browse files
lianchengmarmbrus
authored andcommitted
[SPARK-9899] [SQL] Disables customized output committer when speculation is on
Speculation hates direct output committer, as there are multiple corner cases that may cause data corruption and/or data loss. Please see this [PR comment] [1] for more details. [1]: #8191 (comment) Author: Cheng Lian <[email protected]> Closes #8317 from liancheng/spark-9899/speculation-hates-direct-output-committer. (cherry picked from commit f3ff4c4) Signed-off-by: Michael Armbrust <[email protected]> Conflicts: sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
1 parent d9dfd43 commit b32a31d

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer(
5858
// This is only used on driver side.
5959
@transient private val jobContext: JobContext = job
6060

61+
private val speculationEnabled: Boolean =
62+
relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
63+
6164
// The following fields are initialized and used on both driver and executor side.
6265
@transient protected var outputCommitter: OutputCommitter = _
6366
@transient private var jobId: JobID = _
@@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer(
126129
// associated with the file output format since it is not safe to use a custom
127130
// committer for appending. For example, in S3, direct parquet output committer may
128131
// leave partial data in the destination dir when the the appending job fails.
132+
//
133+
// See SPARK-8578 for more details
129134
logInfo(
130-
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
135+
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
131136
"for appending.")
132137
defaultOutputCommitter
138+
} else if (speculationEnabled) {
139+
// When speculation is enabled, it's not safe to use customized output committer classes,
140+
// especially direct output committers (e.g. `DirectParquetOutputCommitter`).
141+
//
142+
// See SPARK-9899 for more details.
143+
logInfo(
144+
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
145+
"because spark.speculation is configured to be true.")
146+
defaultOutputCommitter
133147
} else {
134148
val committerClass = context.getConfiguration.getClass(
135149
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])

sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
553553
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
554554
}
555555
}
556+
557+
test("SPARK-9899 Disable customized output committer when speculation is on") {
558+
val clonedConf = new Configuration(configuration)
559+
val speculationEnabled =
560+
sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
561+
562+
try {
563+
withTempPath { dir =>
564+
// Enables task speculation
565+
sqlContext.sparkContext.conf.set("spark.speculation", "true")
566+
567+
// Uses a customized output committer which always fails
568+
configuration.set(
569+
SQLConf.OUTPUT_COMMITTER_CLASS.key,
570+
classOf[AlwaysFailOutputCommitter].getName)
571+
572+
// Code below shouldn't throw since customized output committer should be disabled.
573+
val df = sqlContext.range(10).coalesce(1)
574+
df.write.format(dataSourceName).save(dir.getCanonicalPath)
575+
checkAnswer(
576+
sqlContext
577+
.read
578+
.format(dataSourceName)
579+
.option("dataSchema", df.schema.json)
580+
.load(dir.getCanonicalPath),
581+
df)
582+
}
583+
} finally {
584+
// Hadoop 1 doesn't have `Configuration.unset`
585+
configuration.clear()
586+
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
587+
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
588+
}
589+
}
556590
}
557591

558592
// This class is used to test SPARK-8578. We should not use any custom output committer when

0 commit comments

Comments
 (0)