Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ private[sql] abstract class BaseWriterContainer(
// This is only used on driver side.
@transient private val jobContext: JobContext = job

private val speculationEnabled: Boolean =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why bother having a private variable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we need this since the relation and SQLContext are transient and so not available on the executors.

relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)

// The following fields are initialized and used on both driver and executor side.
@transient protected var outputCommitter: OutputCommitter = _
@transient private var jobId: JobID = _
Expand Down Expand Up @@ -126,10 +129,21 @@ private[sql] abstract class BaseWriterContainer(
// associated with the file output format since it is not safe to use a custom
// committer for appending. For example, in S3, direct parquet output committer may
// leave partial data in the destination dir when the the appending job fails.
//
// See SPARK-8578 for more details
logInfo(
s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else if (speculationEnabled) {
// When speculation is enabled, it's not safe to use customized output committer classes,
// especially direct output committers (e.g. `DirectParquetOutputCommitter`).
//
// See SPARK-9899 for more details.
logInfo(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
val committerClass = context.getConfiguration.getClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,40 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}

test("SPARK-9899 Disable customized output committer when speculation is on") {
val clonedConf = new Configuration(configuration)
val speculationEnabled =
sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)

try {
withTempPath { dir =>
// Enables task speculation
sqlContext.sparkContext.conf.set("spark.speculation", "true")

// Uses a customized output committer which always fails
configuration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)

// Code below shouldn't throw since customized output committer should be disabled.
val df = sqlContext.range(10).coalesce(1)
df.write.format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.load(dir.getCanonicalPath),
df)
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
configuration.clear()
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
}

// This class is used to test SPARK-8578. We should not use any custom output committer when
Expand Down