diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 68ed3aa5b062f..3a059d1b95b9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -40,6 +40,7 @@ sealed abstract class BlockId { def isRDD: Boolean = isInstanceOf[RDDBlockId] def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId] def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId] + def isTemp: Boolean = isInstanceOf[TempLocalBlockId] || isInstanceOf[TempShuffleBlockId] override def toString: String = name } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index ee43b76e17010..d771997304989 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -47,18 +47,36 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf) + + /* Create container directories on YARN to persist the temporary files. + * (temp_local, temp_shuffle) + * These files have no opportunity to be cleaned before application end on YARN. + * This is a real issue, especially for long-lived Spark application like Spark thrift-server. + * So we persist these files in YARN container directories which could be cleaned by YARN when + * the container exists. */ + private[spark] val containerDirs: Array[File] = + if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File] + private[spark] val localDirsString: Array[String] = localDirs.map(_.toString) // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) + private val subContainerDirs = if (containerDirEnabled) { + Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir)) + } else { + Array.empty[Array[File]] + } + private val shutdownHook = addShutdownHook() - /** Looks up a file by hashing it into one of our local subdirectories. */ + /** Looks up a file by hashing it into one of our local/container subdirectories. */ // This method should be kept in sync with // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile(). - def getFile(filename: String): File = { + def getFile(localDirs: Array[File], subDirs: Array[Array[File]], + subDirsPerLocalDir: Int, filename: String): File = { // Figure out which local directory it hashes to, and which subdirectory in that val hash = Utils.nonNegativeHash(filename) val dirId = hash % localDirs.length @@ -82,17 +100,29 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea new File(subDir, filename) } - def getFile(blockId: BlockId): File = getFile(blockId.name) + /** + * Used only for testing. + */ + private[spark] def getFile(filename: String): File = + getFile(localDirs, subDirs, subDirsPerLocalDir, filename) + + def getFile(blockId: BlockId): File = { + if (containerDirEnabled && blockId.isTemp) { + getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name) + } else { + getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name) + } + } /** Check if disk block manager has a block. */ def containsBlock(blockId: BlockId): Boolean = { - getFile(blockId.name).exists() + getFile(blockId).exists() } /** List all the files currently stored on disk by the disk manager. */ def getAllFiles(): Seq[File] = { // Get all the files inside the array of array of directories - subDirs.flatMap { dir => + (subDirs ++ subContainerDirs).flatMap { dir => dir.synchronized { // Copy the content of dir because it may be modified in other threads dir.clone() @@ -172,6 +202,27 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } } + /** + * Create container directories for storing block data in YARN mode. + * These directories are located inside configured local directories and + * will be deleted in the processing of container clean of YARN. + */ + private def createContainerDirs(conf: SparkConf): Array[File] = { + Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => + val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}" + try { + val containerDir = Utils.createDirectory(containerDirPath, "blockmgr") + logInfo(s"Created YARN container directory at $containerDir") + Some(containerDir) + } catch { + case e: IOException => + logError(s"Failed to create YARN container dir in $containerDirPath." + + s" Ignoring this directory.", e) + None + } + } + } + private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => @@ -194,15 +245,15 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea private def doStop(): Unit = { if (deleteFilesOnStop) { - localDirs.foreach { localDir => - if (localDir.isDirectory() && localDir.exists()) { + (localDirs ++ containerDirs).foreach { dir => + if (dir.isDirectory() && dir.exists()) { try { - if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) { - Utils.deleteRecursively(localDir) + if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) { + Utils.deleteRecursively(dir) } } catch { case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) + logError(s"Exception while deleting local spark dir: $dir", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index fbda4912e15ad..c6fb9af72a796 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -96,7 +96,7 @@ private[spark] class DiskStore( } def getBytes(blockId: BlockId): BlockData = { - getBytes(diskManager.getFile(blockId.name), getSize(blockId)) + getBytes(diskManager.getFile(blockId), getSize(blockId)) } def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match { @@ -111,7 +111,7 @@ private[spark] class DiskStore( def remove(blockId: BlockId): Boolean = { blockSizes.remove(blockId) - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) if (file.exists()) { val ret = file.delete() if (!ret) { @@ -129,12 +129,12 @@ private[spark] class DiskStore( */ def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = { blockSizes.put(targetBlockId, blockSize) - val targetFile = diskManager.getFile(targetBlockId.name) + val targetFile = diskManager.getFile(targetBlockId) FileUtils.moveFile(sourceFile, targetFile) } def contains(blockId: BlockId): Boolean = { - val file = diskManager.getFile(blockId.name) + val file = diskManager.getFile(blockId) file.exists() } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index ccc525e854838..12297ec67e015 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{File, FileWriter} +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} @@ -129,6 +130,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B rootDir0.setExecutable(true) rootDir1.setExecutable(true) } + } + test("test write temp file into container dir") { + val conf = spy(testConf.clone) + val containerId = "container_e1987_1564558112805_31178_01_000131" + conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1") + when(conf.getenv("CONTAINER_ID")).thenReturn(containerId) + when(conf.getenv("LOCAL_DIRS")).thenReturn(System.getProperty("java.io.tmpdir")) + val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true) + val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2 + val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2 + assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists") + assert(tempShuffleFile1.getAbsolutePath.contains(containerId)) + assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists") + assert(tempLocalFile1.getAbsolutePath.contains(containerId)) } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 97b9c973e97f2..75eb5fa343267 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -150,7 +150,7 @@ class DiskStoreSuite extends SparkFunSuite { assert(diskStore.getSize(blockId) === testData.length) - val diskData = Files.toByteArray(diskBlockManager.getFile(blockId.name)) + val diskData = Files.toByteArray(diskBlockManager.getFile(blockId)) assert(!Arrays.equals(testData, diskData)) val blockData = diskStore.getBytes(blockId)