Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
* Output commit coordinator is only contacted when the following two configurations are both set
* to `true`:
*
* - `spark.speculation`
* - `spark.hadoop.outputCommitCoordination.enabled`
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
* is set to true (which is the default).
*/
def commitTask(
committer: MapReduceOutputCommitter,
Expand All @@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
// We only need to coordinate with the driver if there are concurrent task attempts.
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
}

if (shouldCoordinateWithDriver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
sc = new SparkContext(conf) {
override private[spark] def createSparkEnv(
conf: SparkConf,
Expand Down