Skip to content

Commit 1551a72

Browse files
JoshRosenAndrew Or
authored andcommitted
[SPARK-15736][CORE] Gracefully handle loss of DiskStore files
If an RDD partition is cached on disk and the DiskStore file is lost, then reads of that cached partition will fail and the missing partition is supposed to be recomputed by a new task attempt. In the current BlockManager implementation, however, the missing file does not trigger any metadata updates / does not invalidate the cache, so subsequent task attempts will be scheduled on the same executor and the doomed read will be repeatedly retried, leading to repeated task failures and eventually a total job failure. In order to fix this problem, the executor with the missing file needs to properly mark the corresponding block as missing so that it stops advertising itself as a cache location for that block. This patch fixes this bug and adds an end-to-end regression test (in `FailureSuite`) and a set of unit tests (`in BlockManagerSuite`). Author: Josh Rosen <[email protected]> Closes #13473 from JoshRosen/handle-missing-cache-files. (cherry picked from commit 229f902) Signed-off-by: Andrew Or <[email protected]>
1 parent 0802ff9 commit 1551a72

File tree

3 files changed

+66
-6
lines changed

3 files changed

+66
-6
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,17 @@ private[spark] class BlockManager(
402402
locations
403403
}
404404

405+
/**
406+
* Cleanup code run in response to a failed local read.
407+
* Must be called while holding a read lock on the block.
408+
*/
409+
private def handleLocalReadFailure(blockId: BlockId): Nothing = {
410+
releaseLock(blockId)
411+
// Remove the missing block so that its unavailability is reported to the driver
412+
removeBlock(blockId)
413+
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
414+
}
415+
405416
/**
406417
* Get block from local block manager as an iterator of Java objects.
407418
*/
@@ -441,8 +452,7 @@ private[spark] class BlockManager(
441452
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
442453
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
443454
} else {
444-
releaseLock(blockId)
445-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
455+
handleLocalReadFailure(blockId)
446456
}
447457
}
448458
}
@@ -489,8 +499,7 @@ private[spark] class BlockManager(
489499
// The block was not found on disk, so serialize an in-memory copy:
490500
serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get)
491501
} else {
492-
releaseLock(blockId)
493-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
502+
handleLocalReadFailure(blockId)
494503
}
495504
} else { // storage level is serialized
496505
if (level.useMemory && memoryStore.contains(blockId)) {
@@ -499,8 +508,7 @@ private[spark] class BlockManager(
499508
val diskBytes = diskStore.getBytes(blockId)
500509
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
501510
} else {
502-
releaseLock(blockId)
503-
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
511+
handleLocalReadFailure(blockId)
504512
}
505513
}
506514
}

core/src/test/scala/org/apache/spark/FailureSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.io.{IOException, NotSerializableException, ObjectInputStream}
2121

2222
import org.apache.spark.memory.TestMemoryConsumer
23+
import org.apache.spark.storage.StorageLevel
2324
import org.apache.spark.util.NonSerializable
2425

2526
// Common state shared by FailureSuite-launched tasks. We use a global object
@@ -241,6 +242,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
241242
FailureSuiteState.clear()
242243
}
243244

245+
test("failure because cached RDD partitions are missing from DiskStore (SPARK-15736)") {
246+
sc = new SparkContext("local[1,2]", "test")
247+
val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
248+
rdd.count()
249+
// Directly delete all files from the disk store, triggering failures when reading cached data:
250+
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
251+
// Each task should fail once due to missing cached data, but then should succeed on its second
252+
// attempt because the missing cache locations will be purged and the blocks will be recomputed.
253+
rdd.count()
254+
}
255+
244256
// TODO: Need to add tests with shuffle fetch failures.
245257
}
246258

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
11391139
assert(store.getSingle("a3").isDefined, "a3 was not in store")
11401140
}
11411141

1142+
private def testReadWithLossOfOnDiskFiles(
1143+
storageLevel: StorageLevel,
1144+
readMethod: BlockManager => Option[_]): Unit = {
1145+
store = makeBlockManager(12000)
1146+
assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel))
1147+
assert(store.getStatus("blockId").isDefined)
1148+
// Directly delete all files from the disk store, triggering failures when reading blocks:
1149+
store.diskBlockManager.getAllFiles().foreach(_.delete())
1150+
// The BlockManager still thinks that these blocks exist:
1151+
assert(store.getStatus("blockId").isDefined)
1152+
// Because the BlockManager's metadata claims that the block exists (i.e. that it's present
1153+
// in at least one store), the read attempts to read it and fails when the on-disk file is
1154+
// missing.
1155+
intercept[SparkException] {
1156+
readMethod(store)
1157+
}
1158+
// Subsequent read attempts will succeed; the block isn't present but we return an expected
1159+
// "block not found" response rather than a fatal error:
1160+
assert(readMethod(store).isEmpty)
1161+
// The reason why this second read succeeded is because the metadata entry for the missing
1162+
// block was removed as a result of the read failure:
1163+
assert(store.getStatus("blockId").isEmpty)
1164+
}
1165+
1166+
test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") {
1167+
val storageLevels = Seq(
1168+
StorageLevel(useDisk = true, useMemory = false, deserialized = false),
1169+
StorageLevel(useDisk = true, useMemory = false, deserialized = true))
1170+
val readMethods = Map[String, BlockManager => Option[_]](
1171+
"getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
1172+
"getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId"))
1173+
)
1174+
testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId"))
1175+
for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) {
1176+
withClue(s"$readMethodName $storageLevel") {
1177+
testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
1178+
}
1179+
}
1180+
}
1181+
11421182
test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") {
11431183
val mockBlockTransferService =
11441184
new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5))

0 commit comments

Comments
 (0)