From ba7fc4b39645939c44628ff3cdeed8f0dd3965a0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 15:23:03 -0700 Subject: [PATCH 1/4] Do not tie OutputCommitCoordinator to speculation --- .../spark/mapred/SparkHadoopMapRedUtil.scala | 16 ++++++---------- ...OutputCommitCoordinatorIntegrationSuite.scala | 1 - .../scheduler/OutputCommitCoordinatorSuite.scala | 1 - .../execution/datasources/WriterContainer.scala | 3 --- 4 files changed, 6 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 891facba33110..6311fffc8a158 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging { * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for * details). * - * Output commit coordinator is only contacted when the following two configurations are both set - * to `true`: - * - * - `spark.speculation` - * - `spark.hadoop.outputCommitCoordination.enabled` + * Output commit coordinator is only contacted when `spark.hadoop.outputCommitCoordination.enabled` + * is set to true (which is the default). */ def commitTask( committer: MapReduceOutputCommitter, @@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging { if (committer.needsTaskCommit(mrTaskContext)) { val shouldCoordinateWithDriver: Boolean = { val sparkConf = SparkEnv.get.conf - // We only need to coordinate with the driver if there are multiple concurrent task - // attempts, which should only occur if speculation is enabled - val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false) - // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs - sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled) + // We only need to coordinate with the driver if there are multiple concurrent task attempts. + // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029). + // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs. + sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true) } if (shouldCoordinateWithDriver) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 9f41aca8a1e14..3e1ce4c1ce7fd 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -38,7 +38,6 @@ class OutputCommitCoordinatorIntegrationSuite super.beforeAll() val conf = new SparkConf() .set("master", "local[2,4]") - .set("spark.speculation", "true") .set("spark.hadoop.mapred.output.committer.class", classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName) sc = new SparkContext("local[2, 4]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index c461da65bdc43..a8776cf8018b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -77,7 +77,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val conf = new SparkConf() .setMaster("local[4]") .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName) - .set("spark.speculation", "true") sc = new SparkContext(conf) { override private[spark] def createSparkEnv( conf: SparkConf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index f6b7f0854bf22..832603f57d379 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -66,9 +66,6 @@ private[sql] abstract class BaseWriterContainer( // This is only used on driver side. @transient private val jobContext: JobContext = job - private val speculationEnabled: Boolean = - relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) - // The following fields are initialized and used on both driver and executor side. @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _ From 8b207af12f29b87b236bc4380bae08e7589ea31b Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 15:40:16 -0700 Subject: [PATCH 2/4] Super ensure we use coordinator in test suites --- .../scheduler/OutputCommitCoordinatorIntegrationSuite.scala | 1 + .../apache/spark/scheduler/OutputCommitCoordinatorSuite.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala index 3e1ce4c1ce7fd..601f1c378c41f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala @@ -38,6 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite super.beforeAll() val conf = new SparkConf() .set("master", "local[2,4]") + .set("spark.hadoop.outputCommitCoordination.enabled", "true") .set("spark.hadoop.mapred.output.committer.class", classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName) sc = new SparkContext("local[2, 4]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index a8776cf8018b6..8e509de7677c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -77,6 +77,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { val conf = new SparkConf() .setMaster("local[4]") .setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName) + .set("spark.hadoop.outputCommitCoordination.enabled", "true") sc = new SparkContext(conf) { override private[spark] def createSparkEnv( conf: SparkConf, From 22fafddc984b3dea1de8118c970138785b36f6ad Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 15:41:30 -0700 Subject: [PATCH 3/4] Fix style --- .../scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index 6311fffc8a158..607283a306b8f 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -33,7 +33,7 @@ object SparkHadoopMapRedUtil extends Logging { * the driver in order to determine whether this attempt can commit (please see SPARK-4879 for * details). * - * Output commit coordinator is only contacted when `spark.hadoop.outputCommitCoordination.enabled` + * Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled` * is set to true (which is the default). */ def commitTask( @@ -61,7 +61,7 @@ object SparkHadoopMapRedUtil extends Logging { if (committer.needsTaskCommit(mrTaskContext)) { val shouldCoordinateWithDriver: Boolean = { val sparkConf = SparkEnv.get.conf - // We only need to coordinate with the driver if there are multiple concurrent task attempts. + // We only need to coordinate with the driver if there are concurrent task attempts. // Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029). // This (undocumented) setting is an escape-hatch in case the commit code introduces bugs. sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true) From 076f6b5a9aa870627ddd4b40796cfa83649b5571 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 7 Apr 2016 15:49:13 -0700 Subject: [PATCH 4/4] Revert change to minimize conflicts --- .../spark/sql/execution/datasources/WriterContainer.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 832603f57d379..f6b7f0854bf22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -66,6 +66,9 @@ private[sql] abstract class BaseWriterContainer( // This is only used on driver side. @transient private val jobContext: JobContext = job + private val speculationEnabled: Boolean = + relation.sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false) + // The following fields are initialized and used on both driver and executor side. @transient protected var outputCommitter: OutputCommitter = _ @transient private var jobId: JobID = _