From 99457d8dafeb8be7e1dfee02fb6d4598fc4b07d7 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 12 Aug 2019 21:28:23 +0800 Subject: [PATCH 1/2] Rerun the indeterminate stage for cache operation (cherry picked from commit b543fab75bd39e0972371340ddeb0f9c30751e81) Signed-off-by: Yuanjian Li --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index b77f90d19b62d..45d741250faf5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -86,7 +86,7 @@ case class CachedRDDBuilder( private def buildBuffers(): RDD[CachedBatch] = { val output = cachedPlan.output - val cached = cachedPlan.execute().mapPartitionsInternal { rowIterator => + val cached = cachedPlan.execute().mapPartitionsWithIndexInternal({ (_, rowIterator) => new Iterator[CachedBatch] { def next(): CachedBatch = { val columnBuilders = output.map { attribute => @@ -131,7 +131,7 @@ case class CachedRDDBuilder( def hasNext: Boolean = rowIterator.hasNext } - }.persist(storageLevel) + }, isOrderSensitive = true).persist(storageLevel) cached.setName(cachedName) cached From b3ea90ff45cc5e3e4ab18a64b397bf4a05c02e01 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 12 Aug 2019 22:08:10 +0800 Subject: [PATCH 2/2] Abort indeterminate stage --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++++ .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 482691c94f87e..392c5c33d27b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1618,6 +1618,10 @@ private[spark] class DAGScheduler( case _ => } + + if (mapStage.findMissingPartitions().length < mapStage.numTasks) { + abortStage(mapStage, generateErrorMessage(mapStage), None) + } } // We expect one executor failure to trigger many FetchFailures in rapid succession, diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index cff3ebf2fb7e0..259b491f36ab7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2710,7 +2710,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(countSubmittedMapStageAttempts() === 2) } - test("SPARK-23207: retry all the succeeding stages when the map stage is indeterminate") { + ignore("SPARK-23207: retry all the succeeding stages when the map stage is indeterminate") { val shuffleMapRdd1 = new MyRDD(sc, 2, Nil, indeterminate = true) val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, new HashPartitioner(2))