Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.util.UUID

import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi

/**
Expand Down Expand Up @@ -95,6 +96,10 @@ private[spark] case class TestBlockId(id: String) extends BlockId {
override def name: String = "test_" + id
}

@DeveloperApi
class UnrecognizedBlockId(name: String)
extends SparkException(s"Failed to parse $name into a block ID")

@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
Expand All @@ -104,10 +109,11 @@ object BlockId {
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r

/** Converts a BlockId "name" String back into a BlockId. */
def apply(id: String): BlockId = id match {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
Expand All @@ -122,9 +128,13 @@ object BlockId {
TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) =>
StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEMP_LOCAL(uuid) =>
TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) =>
TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) =>
TestBlockId(value)
case _ =>
throw new IllegalStateException("Unrecognized BlockId: " + id)
throw new UnrecognizedBlockId(name)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

/** List all the blocks currently stored on disk by the disk manager. */
def getAllBlocks(): Seq[BlockId] = {
getAllFiles().map(f => BlockId(f.getName))
getAllFiles().flatMap { f =>
try {
Some(BlockId(f.getName))
} catch {
case _: UnrecognizedBlockId =>
// Skip files which do not correspond to blocks, for example temporary
// files created by [[SortShuffleWriter]].
None
}
}
}

/** Produces a unique block id and File suitable for storing local intermediate results. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,8 @@ class BlockIdSuite extends SparkFunSuite {
}

test("test-bad-deserialization") {
try {
// Try to deserialize an invalid block id.
intercept[UnrecognizedBlockId] {
BlockId("myblock")
fail()
} catch {
case e: IllegalStateException => // OK
case _: Throwable => fail()
}
}

Expand Down Expand Up @@ -139,6 +134,7 @@ class BlockIdSuite extends SparkFunSuite {
assert(id.id.getMostSignificantBits() === 5)
assert(id.id.getLeastSignificantBits() === 2)
assert(!id.isShuffle)
assertSame(id, BlockId(id.toString))
}

test("temp shuffle") {
Expand All @@ -151,6 +147,7 @@ class BlockIdSuite extends SparkFunSuite {
assert(id.id.getMostSignificantBits() === 1)
assert(id.id.getLeastSignificantBits() === 2)
assert(!id.isShuffle)
assertSame(id, BlockId(id.toString))
}

test("test") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.storage

import java.io.{File, FileWriter}
import java.util.UUID

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -79,6 +80,12 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
assert(diskBlockManager.getAllBlocks.toSet === ids.toSet)
}

test("SPARK-22227: non-block files are skipped") {
val file = diskBlockManager.getFile("unmanaged_file")
writeToFile(file, 10)
assert(diskBlockManager.getAllBlocks().isEmpty)
}

def writeToFile(file: File, numBytes: Int) {
val writer = new FileWriter(file, true)
for (i <- 0 until numBytes) writer.write(i)
Expand Down