Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ sealed abstract class BlockId {
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId] || isInstanceOf[ShuffleBlockBatchId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
def isTemp: Boolean = isInstanceOf[TempLocalBlockId] || isInstanceOf[TempShuffleBlockId]

override def toString: String = name
}
Expand Down
71 changes: 61 additions & 10 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,36 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}

def containerDirEnabled: Boolean = Utils.isRunningInYarnContainer(conf)

/* Create container directories on YARN to persist the temporary files.
* (temp_local, temp_shuffle)
* These files have no opportunity to be cleaned before application end on YARN.
* This is a real issue, especially for long-lived Spark application like Spark thrift-server.
* So we persist these files in YARN container directories which could be cleaned by YARN when
* the container exists. */
private[spark] val containerDirs: Array[File] =
if (containerDirEnabled) createContainerDirs(conf) else Array.empty[File]

private[spark] val localDirsString: Array[String] = localDirs.map(_.toString)

// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private val subContainerDirs = if (containerDirEnabled) {
Array.fill(containerDirs.length)(new Array[File](subDirsPerLocalDir))
} else {
Array.empty[Array[File]]
}

private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
/** Looks up a file by hashing it into one of our local/container subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.ExecutorDiskUtils#getFile().
def getFile(filename: String): File = {
def getFile(localDirs: Array[File], subDirs: Array[Array[File]],
subDirsPerLocalDir: Int, filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
Expand All @@ -82,17 +100,29 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)
/**
* Used only for testing.
*/
private[spark] def getFile(filename: String): File =
getFile(localDirs, subDirs, subDirsPerLocalDir, filename)

def getFile(blockId: BlockId): File = {
if (containerDirEnabled && blockId.isTemp) {
getFile(containerDirs, subContainerDirs, subDirsPerLocalDir, blockId.name)
} else {
getFile(localDirs, subDirs, subDirsPerLocalDir, blockId.name)
}
}

/** Check if disk block manager has a block. */
def containsBlock(blockId: BlockId): Boolean = {
getFile(blockId.name).exists()
getFile(blockId).exists()
}

/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatMap { dir =>
(subDirs ++ subContainerDirs).flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
dir.clone()
Expand Down Expand Up @@ -172,6 +202,27 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
}
}

/**
* Create container directories for storing block data in YARN mode.
* These directories are located inside configured local directories and
* will be deleted in the processing of container clean of YARN.
*/
private def createContainerDirs(conf: SparkConf): Array[File] = {
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
val containerDirPath = s"$rootDir/${conf.getenv("CONTAINER_ID")}"
try {
val containerDir = Utils.createDirectory(containerDirPath, "blockmgr")
logInfo(s"Created YARN container directory at $containerDir")
Some(containerDir)
} catch {
case e: IOException =>
logError(s"Failed to create YARN container dir in $containerDirPath." +
s" Ignoring this directory.", e)
None
}
}
}

private def addShutdownHook(): AnyRef = {
logDebug("Adding shutdown hook") // force eager creation of logger
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
Expand All @@ -194,15 +245,15 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea

private def doStop(): Unit = {
if (deleteFilesOnStop) {
localDirs.foreach { localDir =>
if (localDir.isDirectory() && localDir.exists()) {
(localDirs ++ containerDirs).foreach { dir =>
if (dir.isDirectory() && dir.exists()) {
try {
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
Utils.deleteRecursively(localDir)
if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(dir)) {
Utils.deleteRecursively(dir)
}
} catch {
case e: Exception =>
logError(s"Exception while deleting local spark dir: $localDir", e)
logError(s"Exception while deleting local spark dir: $dir", e)
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[spark] class DiskStore(
}

def getBytes(blockId: BlockId): BlockData = {
getBytes(diskManager.getFile(blockId.name), getSize(blockId))
getBytes(diskManager.getFile(blockId), getSize(blockId))
}

def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match {
Expand All @@ -111,7 +111,7 @@ private[spark] class DiskStore(

def remove(blockId: BlockId): Boolean = {
blockSizes.remove(blockId)
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
if (file.exists()) {
val ret = file.delete()
if (!ret) {
Expand All @@ -129,12 +129,12 @@ private[spark] class DiskStore(
*/
def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = {
blockSizes.put(targetBlockId, blockSize)
val targetFile = diskManager.getFile(targetBlockId.name)
val targetFile = diskManager.getFile(targetBlockId)
FileUtils.moveFile(sourceFile, targetFile)
}

def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
val file = diskManager.getFile(blockId)
file.exists()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.storage

import java.io.{File, FileWriter}

import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -129,6 +130,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
rootDir0.setExecutable(true)
rootDir1.setExecutable(true)
}
}

test("test write temp file into container dir") {
val conf = spy(testConf.clone)
val containerId = "container_e1987_1564558112805_31178_01_000131"
conf.set("spark.local.dir", rootDirs).set("spark.diskStore.subDirectories", "1")
when(conf.getenv("CONTAINER_ID")).thenReturn(containerId)
when(conf.getenv("LOCAL_DIRS")).thenReturn(System.getProperty("java.io.tmpdir"))
val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true)
val tempShuffleFile1 = diskBlockManager.createTempShuffleBlock()._2
val tempLocalFile1 = diskBlockManager.createTempLocalBlock()._2
assert(tempShuffleFile1.exists(), "There are no bad disks, so temp shuffle file exists")
assert(tempShuffleFile1.getAbsolutePath.contains(containerId))
assert(tempLocalFile1.exists(), "There are no bad disks, so temp local file exists")
assert(tempLocalFile1.getAbsolutePath.contains(containerId))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DiskStoreSuite extends SparkFunSuite {

assert(diskStore.getSize(blockId) === testData.length)

val diskData = Files.toByteArray(diskBlockManager.getFile(blockId.name))
val diskData = Files.toByteArray(diskBlockManager.getFile(blockId))
assert(!Arrays.equals(testData, diskData))

val blockData = diskStore.getBytes(blockId)
Expand Down