From 38f545e1be3f72ab22192a8f984206c1bf5dfb7b Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Mon, 9 Oct 2017 18:52:00 +0200 Subject: [PATCH 1/7] [SPARK-22227][CORE] DiskBlockManager.getAllBlocks now tolerates temp files Prior to this commit getAllBlocks implicitly assumed that the directories managed by the DiskBlockManager contain only the files corresponding to valid block IDs. In reality this assumption was violated during shuffle, which produces temporary files in the same directory as the resulting blocks. As a result, calls to getAllBlocks during shuffle were unreliable. The fix could be made more efficient, but this is probably good enough. --- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 6 +++++- .../org/apache/spark/storage/DiskBlockManagerSuite.scala | 8 ++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3d43e3c367aa..dc35673d16c4 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,6 +20,8 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID +import scala.util.Try + import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.Logging @@ -100,7 +102,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { - getAllFiles().map(f => BlockId(f.getName)) + // SPARK-22227: the Try guides against temporary files written + // during shuffle which do not correspond to valid block IDs. + getAllFiles().flatMap(f => Try(BlockId(f.getName)).toOption) } /** Produces a unique block id and File suitable for storing local intermediate results. */ diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 7859b0bba2b4..5e4c982349e1 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io.{File, FileWriter} +import java.util.UUID import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -79,6 +80,13 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B assert(diskBlockManager.getAllBlocks.toSet === ids.toSet) } + test("SPARK-22227: non-block files are skipped") { + val blockId = TempShuffleBlockId(UUID.randomUUID()) + val file = diskBlockManager.getFile(blockId) + writeToFile(file, 10) + assert(diskBlockManager.getAllBlocks().isEmpty) + } + def writeToFile(file: File, numBytes: Int) { val writer = new FileWriter(file, true) for (i <- 0 until numBytes) writer.write(i) From 152b7340b8a746db3ac8fd57b03d9c992b3a30c8 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Thu, 12 Oct 2017 18:48:49 +0200 Subject: [PATCH 2/7] Allowed to parse Temp*BlockId in BlockId.apply This commit also introduces a safe alternative to BlockId.apply which return option instead of throwing an exception. --- .../org/apache/spark/storage/BlockId.scala | 56 ++++++++++++------- .../spark/storage/DiskBlockManager.scala | 15 +++-- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index a441baed2800..8b3abe887682 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -104,27 +104,43 @@ object BlockId { val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r + val TEMP_LOCAL = "temp_local_([A-Fa-f-]+)".r + val TEMP_SHUFFLE = "temp_shuffle_([A-Fa-f-]+)".r val TEST = "test_(.*)".r - /** Converts a BlockId "name" String back into a BlockId. */ - def apply(id: String): BlockId = id match { - case RDD(rddId, splitIndex) => - RDDBlockId(rddId.toInt, splitIndex.toInt) - case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_DATA(shuffleId, mapId, reduceId) => - ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => - ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case BROADCAST(broadcastId, field) => - BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) - case TASKRESULT(taskId) => - TaskResultBlockId(taskId.toLong) - case STREAM(streamId, uniqueId) => - StreamBlockId(streamId.toInt, uniqueId.toLong) - case TEST(value) => - TestBlockId(value) - case _ => - throw new IllegalStateException("Unrecognized BlockId: " + id) + def apply(name: String): BlockId = { + guess(name).getOrElse { + throw new IllegalStateException("Unrecognized BlockId: " + name) + } + } + + /** Tries to guess block ID type by [[BlockId.name]]. */ + private[storage] def guess(name: String): Option[BlockId] = { + val blockId = name match { + case RDD(rddId, splitIndex) => + RDDBlockId(rddId.toInt, splitIndex.toInt) + case SHUFFLE(shuffleId, mapId, reduceId) => + ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_DATA(shuffleId, mapId, reduceId) => + ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => + ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case BROADCAST(broadcastId, field) => + BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) + case TASKRESULT(taskId) => + TaskResultBlockId(taskId.toLong) + case STREAM(streamId, uniqueId) => + StreamBlockId(streamId.toInt, uniqueId.toLong) + case TEMP_LOCAL(uuid) => + TempShuffleBlockId(UUID.fromString(uuid)) + case TEMP_SHUFFLE(uuid) => + TempShuffleBlockId(UUID.fromString(uuid)) + case TEST(value) => + TestBlockId(value) + case _ => + null + } + + Option(blockId) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index dc35673d16c4..7e1df4127ded 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -20,8 +20,6 @@ package org.apache.spark.storage import java.io.{File, IOException} import java.util.UUID -import scala.util.Try - import org.apache.spark.SparkConf import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.Logging @@ -102,9 +100,16 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { - // SPARK-22227: the Try guides against temporary files written - // during shuffle which do not correspond to valid block IDs. - getAllFiles().flatMap(f => Try(BlockId(f.getName)).toOption) + getAllFiles().flatMap { f => + val blockId = BlockId.guess(f.getName) + if (blockId.isEmpty) { + // This does not handle a special-case of a temporary file + // created by [[SortShuffleWriter]]. + log.warn(s"Encountered an unexpected file in a managed directory: $f") + } + + blockId + } } /** Produces a unique block id and File suitable for storing local intermediate results. */ From 0da650a5a4dde12ddbdfd54eccdc088906b91e8b Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Mon, 23 Oct 2017 11:48:23 +0200 Subject: [PATCH 3/7] Removed BlockId.guess as suggested in the comments --- .../org/apache/spark/storage/BlockId.scala | 59 ++++++++----------- .../spark/storage/DiskBlockManager.scala | 15 ++--- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 8b3abe887682..5a083f65eb34 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -95,6 +95,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId { override def name: String = "test_" + id } +class UnrecognizedBlockId(name: String) extends Exception + @DeveloperApi object BlockId { val RDD = "rdd_([0-9]+)_([0-9]+)".r @@ -108,39 +110,28 @@ object BlockId { val TEMP_SHUFFLE = "temp_shuffle_([A-Fa-f-]+)".r val TEST = "test_(.*)".r - def apply(name: String): BlockId = { - guess(name).getOrElse { - throw new IllegalStateException("Unrecognized BlockId: " + name) - } - } - - /** Tries to guess block ID type by [[BlockId.name]]. */ - private[storage] def guess(name: String): Option[BlockId] = { - val blockId = name match { - case RDD(rddId, splitIndex) => - RDDBlockId(rddId.toInt, splitIndex.toInt) - case SHUFFLE(shuffleId, mapId, reduceId) => - ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_DATA(shuffleId, mapId, reduceId) => - ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => - ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) - case BROADCAST(broadcastId, field) => - BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) - case TASKRESULT(taskId) => - TaskResultBlockId(taskId.toLong) - case STREAM(streamId, uniqueId) => - StreamBlockId(streamId.toInt, uniqueId.toLong) - case TEMP_LOCAL(uuid) => - TempShuffleBlockId(UUID.fromString(uuid)) - case TEMP_SHUFFLE(uuid) => - TempShuffleBlockId(UUID.fromString(uuid)) - case TEST(value) => - TestBlockId(value) - case _ => - null - } - - Option(blockId) + def apply(name: String): BlockId = name match { + case RDD(rddId, splitIndex) => + RDDBlockId(rddId.toInt, splitIndex.toInt) + case SHUFFLE(shuffleId, mapId, reduceId) => + ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_DATA(shuffleId, mapId, reduceId) => + ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => + ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt) + case BROADCAST(broadcastId, field) => + BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_")) + case TASKRESULT(taskId) => + TaskResultBlockId(taskId.toLong) + case STREAM(streamId, uniqueId) => + StreamBlockId(streamId.toInt, uniqueId.toLong) + case TEMP_LOCAL(uuid) => + TempShuffleBlockId(UUID.fromString(uuid)) + case TEMP_SHUFFLE(uuid) => + TempShuffleBlockId(UUID.fromString(uuid)) + case TEST(value) => + TestBlockId(value) + case _ => + throw new UnrecognizedBlockId(name) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7e1df4127ded..9c1efeae898d 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -101,14 +101,15 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea /** List all the blocks currently stored on disk by the disk manager. */ def getAllBlocks(): Seq[BlockId] = { getAllFiles().flatMap { f => - val blockId = BlockId.guess(f.getName) - if (blockId.isEmpty) { - // This does not handle a special-case of a temporary file - // created by [[SortShuffleWriter]]. - log.warn(s"Encountered an unexpected file in a managed directory: $f") + try { + Some(BlockId(f.getName)) + } catch { + case _: UnrecognizedBlockId => + // This does not handle a special-case of a temporary file + // created by [[SortShuffleWriter]]. + log.warn(s"Encountered an unexpected file in a managed directory: $f") + None } - - blockId } } From 87a4ca1b68772cfc751fbbbd449100d0bf71dd4b Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Tue, 24 Oct 2017 11:11:29 +0200 Subject: [PATCH 4/7] Addressed another round of reviews --- core/src/main/scala/org/apache/spark/storage/BlockId.scala | 4 +++- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 5 ++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 5a083f65eb34..d2ea9c258a17 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.util.UUID +import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi /** @@ -95,7 +96,8 @@ private[spark] case class TestBlockId(id: String) extends BlockId { override def name: String = "test_" + id } -class UnrecognizedBlockId(name: String) extends Exception +class UnrecognizedBlockId(name: String) + extends SparkException(s"Failed to parse $name into a block ID") @DeveloperApi object BlockId { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 9c1efeae898d..a69bcc925999 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -105,9 +105,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea Some(BlockId(f.getName)) } catch { case _: UnrecognizedBlockId => - // This does not handle a special-case of a temporary file - // created by [[SortShuffleWriter]]. - log.warn(s"Encountered an unexpected file in a managed directory: $f") + // Skip files which do not correspond to blocks, for example temporary + // files created by [[SortShuffleWriter]]. None } } From b794484698cef0e12468a37996870eee995cde41 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Tue, 24 Oct 2017 18:58:44 +0200 Subject: [PATCH 5/7] Fixed failing test in BlockIdSuite --- core/src/main/scala/org/apache/spark/storage/BlockId.scala | 1 + .../test/scala/org/apache/spark/storage/BlockIdSuite.scala | 7 +------ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index d2ea9c258a17..df98e6448c6b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -96,6 +96,7 @@ private[spark] case class TestBlockId(id: String) extends BlockId { override def name: String = "test_" + id } +@DeveloperApi class UnrecognizedBlockId(name: String) extends SparkException(s"Failed to parse $name into a block ID") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index f0c521b00b58..d6a1463260b6 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -35,13 +35,8 @@ class BlockIdSuite extends SparkFunSuite { } test("test-bad-deserialization") { - try { - // Try to deserialize an invalid block id. + intercept[UnrecognizedBlockId] { BlockId("myblock") - fail() - } catch { - case e: IllegalStateException => // OK - case _: Throwable => fail() } } From ff9a6aed09074846e425eb614182cd6fc5ef74e7 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Tue, 24 Oct 2017 19:31:09 +0200 Subject: [PATCH 6/7] Updated BlockIdSuite with parsing tests for temp blocks --- core/src/main/scala/org/apache/spark/storage/BlockId.scala | 6 +++--- .../test/scala/org/apache/spark/storage/BlockIdSuite.scala | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index df98e6448c6b..7ac2c71c18eb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -109,8 +109,8 @@ object BlockId { val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r - val TEMP_LOCAL = "temp_local_([A-Fa-f-]+)".r - val TEMP_SHUFFLE = "temp_shuffle_([A-Fa-f-]+)".r + val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r + val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r val TEST = "test_(.*)".r def apply(name: String): BlockId = name match { @@ -129,7 +129,7 @@ object BlockId { case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) case TEMP_LOCAL(uuid) => - TempShuffleBlockId(UUID.fromString(uuid)) + TempLocalBlockId(UUID.fromString(uuid)) case TEMP_SHUFFLE(uuid) => TempShuffleBlockId(UUID.fromString(uuid)) case TEST(value) => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala index d6a1463260b6..ff4755833a91 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockIdSuite.scala @@ -134,6 +134,7 @@ class BlockIdSuite extends SparkFunSuite { assert(id.id.getMostSignificantBits() === 5) assert(id.id.getLeastSignificantBits() === 2) assert(!id.isShuffle) + assertSame(id, BlockId(id.toString)) } test("temp shuffle") { @@ -146,6 +147,7 @@ class BlockIdSuite extends SparkFunSuite { assert(id.id.getMostSignificantBits() === 1) assert(id.id.getLeastSignificantBits() === 2) assert(!id.isShuffle) + assertSame(id, BlockId(id.toString)) } test("test") { From 2b6134708e1771546c3b2210b9471b51e20d4cd1 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Wed, 25 Oct 2017 14:14:46 +0200 Subject: [PATCH 7/7] Fixed DiskBlockManagerSuite --- .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 5e4c982349e1..0c4f3c48ef80 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -81,8 +81,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B } test("SPARK-22227: non-block files are skipped") { - val blockId = TempShuffleBlockId(UUID.randomUUID()) - val file = diskBlockManager.getFile(blockId) + val file = diskBlockManager.getFile("unmanaged_file") writeToFile(file, 10) assert(diskBlockManager.getAllBlocks().isEmpty) }