Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {

val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
val tempOutputPath = new Path(outputDir,
s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageAttemptNumber isn't unique among stages and attemptNumber isn't unique among tasks within the same stage. So it seems that this could still lead to the file name conflict. e.g. task 0.0 from stage 0.0 could conflict with task 1.0 from stage 1.0 (different stage) and task 1.0 from stage 1.0 could conflict with task 2.0 from stage 1.0 (same stage).

I think the unique file format should be ...-stageId-stageAttemptId-taskId-taskAttemptId-....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A RDD can across stages?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but all attempts Id starts from 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see we suffix the checkpoint path with rdd id...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just want a unique file name, can we use the task id? It's unique within the Spark application.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean taskAttemptId?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, taskAttemptId, we also use it in the shuffle map file for making the file name unique. #24892 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me create a follow-up for it. Thanks.


val bufferSize = env.conf.get(BUFFER_SIZE)

Expand All @@ -218,11 +218,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
Utils.tryWithSafeFinally {
Utils.tryWithSafeFinallyAndFailureCallbacks {
serializeStream.writeAll(iterator)
} {
} (catchBlock = {
val deleted = fs.delete(tempOutputPath, false)
if (!deleted) {
logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
}
}, finallyBlock = {
serializeStream.close()
}
})

if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
Expand Down
26 changes: 26 additions & 0 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.rdd._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
}
}

test("SPARK-31484: checkpoint should not fail in retry") {
withTempDir { checkpointDir =>
val conf = new SparkConf()
.set(UI_ENABLED.key, "false")
sc = new SparkContext("local[1]", "test", conf)
sc.setCheckpointDir(checkpointDir.toString)
val rdd = sc.makeRDD(1 to 200, numSlices = 4).repartition(1).mapPartitions { iter =>
iter.map { i =>
if (i > 100 && TaskContext.get().stageAttemptNumber() == 0) {
// throw new SparkException("Make first attemp failed.")
// Throw FetchFailedException to explicitly trigger stage resubmission.
// A normal exception will only trigger task resubmission in the same stage.
throw new FetchFailedException(null, 0, 0L, 0, 0, "Fake")
} else {
i
}
}
}
rdd.checkpoint()
assert(rdd.collect().toSeq === (1 to 200))
// Verify that RDD is checkpointed
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
}
}
}