Skip to content

Commit 77ebae3

Browse files
author
Andrew Or
committed
[SPARK-14468] Always enable OutputCommitCoordinator
## What changes were proposed in this pull request? `OutputCommitCoordinator` was introduced to deal with concurrent task attempts racing to write output, leading to data loss or corruption. For more detail, read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468). Before: `OutputCommitCoordinator` is enabled only if speculation is enabled. After: `OutputCommitCoordinator` is always enabled. Users may still disable this through `spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't... ## How was this patch tested? `OutputCommitCoordinator*Suite` Author: Andrew Or <[email protected]> Closes #12244 from andrewor14/always-occ. (cherry picked from commit 3e29e37) Signed-off-by: Andrew Or <[email protected]>
1 parent 8a94a59 commit 77ebae3

File tree

3 files changed

+8
-12
lines changed

3 files changed

+8
-12
lines changed

core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,8 @@ object SparkHadoopMapRedUtil extends Logging {
8181
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
8282
* details).
8383
*
84-
* Output commit coordinator is only contacted when the following two configurations are both set
85-
* to `true`:
86-
*
87-
* - `spark.speculation`
88-
* - `spark.hadoop.outputCommitCoordination.enabled`
84+
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
85+
* is set to true (which is the default).
8986
*/
9087
def commitTask(
9188
committer: MapReduceOutputCommitter,
@@ -112,11 +109,10 @@ object SparkHadoopMapRedUtil extends Logging {
112109
if (committer.needsTaskCommit(mrTaskContext)) {
113110
val shouldCoordinateWithDriver: Boolean = {
114111
val sparkConf = SparkEnv.get.conf
115-
// We only need to coordinate with the driver if there are multiple concurrent task
116-
// attempts, which should only occur if speculation is enabled
117-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
118-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
119-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
112+
// We only need to coordinate with the driver if there are concurrent task attempts.
113+
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
114+
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
115+
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
120116
}
121117

122118
if (shouldCoordinateWithDriver) {

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
3838
super.beforeAll()
3939
val conf = new SparkConf()
4040
.set("master", "local[2,4]")
41-
.set("spark.speculation", "true")
41+
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
4242
.set("spark.hadoop.mapred.output.committer.class",
4343
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
4444
sc = new SparkContext("local[2, 4]", "test", conf)

core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
7878
val conf = new SparkConf()
7979
.setMaster("local[4]")
8080
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
81-
.set("spark.speculation", "true")
81+
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
8282
sc = new SparkContext(conf) {
8383
override private[spark] def createSparkEnv(
8484
conf: SparkConf,

0 commit comments

Comments
 (0)