Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,14 @@ private[spark] class BlockManager(
// Look for block on disk, potentially storing it back in memory if required
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
// DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe
diskStore.getBytes(blockId).get
} else {
// Remove the missing block so that its unavailability is reported to the driver
removeBlock(blockId)
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())

Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/FailureSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark

import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NonSerializable

import java.io.{IOException, NotSerializableException, ObjectInputStream}
Expand Down Expand Up @@ -238,6 +239,17 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}

test("failure because cached RDD files are missing") {
sc = new SparkContext("local[1,2]", "test")
val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY)
rdd.count()
// Directly delete all files from the disk store, triggering failures when reading cached data:
SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete())
// Each task should fail once due to missing cached data, but then should succeed on its second
// attempt because the missing cache locations will be purged and the blocks will be recomputed.
rdd.count()
}

// TODO: Need to add tests with shuffle fetch failures.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,4 +1361,45 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(result.data === Right(bytes))
assert(result.droppedBlocks === Nil)
}

private def testReadWithLossOfOnDiskFiles(
storageLevel: StorageLevel,
readMethod: BlockManager => Option[_]): Unit = {
store = makeBlockManager(12000)
assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel).nonEmpty)
assert(store.getStatus("blockId").isDefined)
// Directly delete all files from the disk store, triggering failures when reading blocks:
store.diskBlockManager.getAllFiles().foreach(_.delete())
// The BlockManager still thinks that these blocks exist:
assert(store.getStatus("blockId").isDefined)
// Because the BlockManager's metadata claims that the block exists (i.e. that it's present
// in at least one store), the read attempts to read it and fails when the on-disk file is
// missing.
intercept[BlockException] {
readMethod(store)
}
// Subsequent read attempts will succeed; the block isn't present but we return an expected
// "block not found" response rather than a fatal error:
assert(readMethod(store).isEmpty)
// The reason why this second read succeeded is because the metadata entry for the missing
// block was removed as a result of the read failure:
assert(store.getStatus("blockId").isEmpty)
}

test("remove cached block if a read fails due to missing on-disk files") {
val storageLevels = Seq(
StorageLevel(useDisk = true, useMemory = false, deserialized = false),
StorageLevel(useDisk = true, useMemory = false, deserialized = true))
val readMethods = Map[String, BlockManager => Option[_]](
"getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")),
"getLocal" -> ((m: BlockManager) => m.getLocal("blockId"))
)
testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId"))
for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) {
withClue(s"$readMethodName $storageLevel") {
testReadWithLossOfOnDiskFiles(storageLevel, readMethod)
}
}
}

}