From 6dfd454ba36b120d05317bfd984847f14e1ab87f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 Apr 2020 14:14:07 -0700 Subject: [PATCH 1/3] Add stage attempt number to temp checkpoint filename. --- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a5c07c07e8f2b..b29b56b351f02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -199,8 +199,8 @@ private[spark] object ReliableCheckpointRDD extends Logging { val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId()) val finalOutputPath = new Path(outputDir, finalOutputName) - val tempOutputPath = - new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}") + val tempOutputPath = new Path(outputDir, + s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}") val bufferSize = env.conf.get(BUFFER_SIZE) @@ -218,11 +218,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { } val serializer = env.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) - Utils.tryWithSafeFinally { + Utils.tryWithSafeFinallyAndFailureCallbacks { serializeStream.writeAll(iterator) - } { + } (catchBlock = { + val deleted = fs.delete(tempOutputPath, false) + if (!deleted) { + logInfo(s"Failed to delete tempOutputPath $tempOutputPath.") + } + }, finallyBlock = { serializeStream.close() - } + }) if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.exists(finalOutputPath)) { From da210ea809c3e37e51ef1cc1bafe5e7f9fe2c2ca Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 Apr 2020 15:31:08 -0700 Subject: [PATCH 2/3] Add test. --- .../org/apache/spark/CheckpointSuite.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 6a108a55045ee..62cd8c022a609 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec import org.apache.spark.rdd._ +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils @@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton)) } } + + test("checkpoint should not fail in retry") { + withTempDir { checkpointDir => + val conf = new SparkConf() + .set(UI_ENABLED.key, "false") + sc = new SparkContext("local[1]", "test", conf) + sc.setCheckpointDir(checkpointDir.toString) + val rdd = sc.makeRDD(1 to 200, numSlices = 4).repartition(1).mapPartitions { iter => + iter.map { i => + if (i > 100 && TaskContext.get().stageAttemptNumber() == 0) { + // throw new SparkException("Make first attemp failed.") + // Throw FetchFailedException to explicitly trigger stage resubmission. + // A normal exception will only trigger task resubmission in the same stage. + throw new FetchFailedException(null, 0, 0L, 0, 0, "Fake") + } else { + i + } + } + } + rdd.checkpoint() + assert(rdd.collect().toSeq === (1 to 200)) + // Verify that RDD is checkpointed + assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]]) + } + } } From f402180974408bf3a22e80081eeed57793ad4830 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 18 Apr 2020 22:16:17 -0700 Subject: [PATCH 3/3] Add JIRA prefix. --- core/src/test/scala/org/apache/spark/CheckpointSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 62cd8c022a609..a69381d18e3b6 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -644,7 +644,7 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { } } - test("checkpoint should not fail in retry") { + test("SPARK-31484: checkpoint should not fail in retry") { withTempDir { checkpointDir => val conf = new SparkConf() .set(UI_ENABLED.key, "false")