Skip to content

Commit 36a7a68

Browse files
zhzhanpwendell
authored andcommitted
[SPARK-6479] [BLOCK MANAGER] Create off-heap block storage API
This is the classes for creating off-heap block storage API. It also includes the migration for Tachyon. The diff seems to be big, but it mainly just rename tachyon to offheap. New implementation for hdfs will be submit for review in spark-6112. Author: Zhan Zhang <[email protected]> Closes apache#5430 from zhzhan/SPARK-6479 and squashes the following commits: 60acd84 [Zhan Zhang] minor change to kickoff the test 12f54c9 [Zhan Zhang] solve merge conflicts a54132c [Zhan Zhang] solve review comments ffb8e00 [Zhan Zhang] rebase to sparkcontext change 6e121e0 [Zhan Zhang] resolve review comments and restructure blockmanasger code a7aed6c [Zhan Zhang] add Tachyon migration code 186de31 [Zhan Zhang] initial commit for off-heap block storage api
1 parent b5347a4 commit 36a7a68

22 files changed

+536
-331
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
247247
private[spark] def eventLogDir: Option[URI] = _eventLogDir
248248
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
249249

250-
// Generate the random name for a temp folder in Tachyon
250+
// Generate the random name for a temp folder in external block store.
251251
// Add a timestamp as the suffix here to make it more safe
252-
val tachyonFolderName = "spark-" + randomUUID.toString()
252+
val externalBlockStoreFolderName = "spark-" + randomUUID.toString()
253+
@deprecated("Use externalBlockStoreFolderName instead.", "1.4.0")
254+
val tachyonFolderName = externalBlockStoreFolderName
253255

254256
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
255257

@@ -386,7 +388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
386388
}
387389
}
388390

389-
_conf.set("spark.tachyonStore.folderName", tachyonFolderName)
391+
_conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
390392

391393
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
392394

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ object ExecutorExitCode {
3333
/** DiskStore failed to create a local temporary directory after many attempts. */
3434
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
3535

36-
/** TachyonStore failed to initialize after many attempts. */
37-
val TACHYON_STORE_FAILED_TO_INITIALIZE = 54
36+
/** ExternalBlockStore failed to initialize after many attempts. */
37+
val EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE = 54
3838

39-
/** TachyonStore failed to create a local temporary directory after many attempts. */
40-
val TACHYON_STORE_FAILED_TO_CREATE_DIR = 55
39+
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
40+
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
4141

4242
def explainExitCode(exitCode: Int): String = {
4343
exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
4646
case OOM => "OutOfMemoryError"
4747
case DISK_STORE_FAILED_TO_CREATE_DIR =>
4848
"Failed to create local directory (bad spark.local.dir?)"
49-
case TACHYON_STORE_FAILED_TO_INITIALIZE => "TachyonStore failed to initialize."
50-
case TACHYON_STORE_FAILED_TO_CREATE_DIR =>
51-
"TachyonStore failed to create a local temporary directory."
49+
// TODO: replace external block store with concrete implementation name
50+
case EXTERNAL_BLOCK_STORE_FAILED_TO_INITIALIZE => "ExternalBlockStore failed to initialize."
51+
// TODO: replace external block store with concrete implementation name
52+
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
53+
"ExternalBlockStore failed to create a local temporary directory."
5254
case _ =>
5355
"Unknown executor exit code (" + exitCode + ")" + (
5456
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
14751475

14761476
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
14771477
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1478-
" CachedPartitions: %d; MemorySize: %s; TachyonSize: %s; DiskSize: %s".format(
1478+
" CachedPartitions: %d; MemorySize: %s; ExternalBlockStoreSize: %s; DiskSize: %s".format(
14791479
info.numCachedPartitions, bytesToString(info.memSize),
1480-
bytesToString(info.tachyonSize), bytesToString(info.diskSize)))
1480+
bytesToString(info.externalBlockStoreSize), bytesToString(info.diskSize)))
14811481

14821482
s"$rdd [$persistence]" +: storageInfo
14831483
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,11 @@ private[spark] class BlockManager(
7878
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
7979

8080
// Actual storage of where blocks are kept
81-
private var tachyonInitialized = false
81+
private var externalBlockStoreInitialized = false
8282
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
8383
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
84-
private[spark] lazy val tachyonStore: TachyonStore = {
85-
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
86-
val appFolderName = conf.get("spark.tachyonStore.folderName")
87-
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
88-
val tachyonMaster = conf.get("spark.tachyonStore.url", "tachyon://localhost:19998")
89-
val tachyonBlockManager =
90-
new TachyonBlockManager(this, tachyonStorePath, tachyonMaster)
91-
tachyonInitialized = true
92-
new TachyonStore(this, tachyonBlockManager)
93-
}
84+
private[spark] lazy val externalBlockStore: ExternalBlockStore =
85+
new ExternalBlockStore(this, executorId)
9486

9587
private[spark]
9688
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -320,13 +312,13 @@ private[spark] class BlockManager(
320312

321313
/**
322314
* Get the BlockStatus for the block identified by the given ID, if it exists.
323-
* NOTE: This is mainly for testing, and it doesn't fetch information from Tachyon.
315+
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
324316
*/
325317
def getStatus(blockId: BlockId): Option[BlockStatus] = {
326318
blockInfo.get(blockId).map { info =>
327319
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
328320
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
329-
// Assume that block is not in Tachyon
321+
// Assume that block is not in external block store
330322
BlockStatus(info.level, memSize, diskSize, 0L)
331323
}
332324
}
@@ -376,10 +368,10 @@ private[spark] class BlockManager(
376368
if (info.tellMaster) {
377369
val storageLevel = status.storageLevel
378370
val inMemSize = Math.max(status.memSize, droppedMemorySize)
379-
val inTachyonSize = status.tachyonSize
371+
val inExternalBlockStoreSize = status.externalBlockStoreSize
380372
val onDiskSize = status.diskSize
381373
master.updateBlockInfo(
382-
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
374+
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExternalBlockStoreSize)
383375
} else {
384376
true
385377
}
@@ -397,15 +389,17 @@ private[spark] class BlockManager(
397389
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
398390
case level =>
399391
val inMem = level.useMemory && memoryStore.contains(blockId)
400-
val inTachyon = level.useOffHeap && tachyonStore.contains(blockId)
392+
val inExternalBlockStore = level.useOffHeap && externalBlockStore.contains(blockId)
401393
val onDisk = level.useDisk && diskStore.contains(blockId)
402394
val deserialized = if (inMem) level.deserialized else false
403-
val replication = if (inMem || inTachyon || onDisk) level.replication else 1
404-
val storageLevel = StorageLevel(onDisk, inMem, inTachyon, deserialized, replication)
395+
val replication = if (inMem || inExternalBlockStore || onDisk) level.replication else 1
396+
val storageLevel =
397+
StorageLevel(onDisk, inMem, inExternalBlockStore, deserialized, replication)
405398
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
406-
val tachyonSize = if (inTachyon) tachyonStore.getSize(blockId) else 0L
399+
val externalBlockStoreSize =
400+
if (inExternalBlockStore) externalBlockStore.getSize(blockId) else 0L
407401
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
408-
BlockStatus(storageLevel, memSize, diskSize, tachyonSize)
402+
BlockStatus(storageLevel, memSize, diskSize, externalBlockStoreSize)
409403
}
410404
}
411405
}
@@ -485,11 +479,11 @@ private[spark] class BlockManager(
485479
}
486480
}
487481

488-
// Look for the block in Tachyon
482+
// Look for the block in external block store
489483
if (level.useOffHeap) {
490-
logDebug(s"Getting block $blockId from tachyon")
491-
if (tachyonStore.contains(blockId)) {
492-
tachyonStore.getBytes(blockId) match {
484+
logDebug(s"Getting block $blockId from ExternalBlockStore")
485+
if (externalBlockStore.contains(blockId)) {
486+
externalBlockStore.getBytes(blockId) match {
493487
case Some(bytes) =>
494488
if (!asBlockResult) {
495489
return Some(bytes)
@@ -498,7 +492,7 @@ private[spark] class BlockManager(
498492
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
499493
}
500494
case None =>
501-
logDebug(s"Block $blockId not found in tachyon")
495+
logDebug(s"Block $blockId not found in externalBlockStore")
502496
}
503497
}
504498
}
@@ -766,8 +760,8 @@ private[spark] class BlockManager(
766760
// We will drop it to disk later if the memory store can't hold it.
767761
(true, memoryStore)
768762
} else if (putLevel.useOffHeap) {
769-
// Use tachyon for off-heap storage
770-
(false, tachyonStore)
763+
// Use external block store
764+
(false, externalBlockStore)
771765
} else if (putLevel.useDisk) {
772766
// Don't get back the bytes from put unless we replicate them
773767
(putLevel.replication > 1, diskStore)
@@ -802,7 +796,7 @@ private[spark] class BlockManager(
802796

803797
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
804798
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
805-
// Now that the block is in either the memory, tachyon, or disk store,
799+
// Now that the block is in either the memory, externalBlockStore, or disk store,
806800
// let other threads read it, and tell the master about it.
807801
marked = true
808802
putBlockInfo.markReady(size)
@@ -1099,10 +1093,11 @@ private[spark] class BlockManager(
10991093
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
11001094
val removedFromMemory = memoryStore.remove(blockId)
11011095
val removedFromDisk = diskStore.remove(blockId)
1102-
val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false
1103-
if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {
1096+
val removedFromExternalBlockStore =
1097+
if (externalBlockStoreInitialized) externalBlockStore.remove(blockId) else false
1098+
if (!removedFromMemory && !removedFromDisk && !removedFromExternalBlockStore) {
11041099
logWarning(s"Block $blockId could not be removed as it was not found in either " +
1105-
"the disk, memory, or tachyon store")
1100+
"the disk, memory, or external block store")
11061101
}
11071102
blockInfo.remove(blockId)
11081103
if (tellMaster && info.tellMaster) {
@@ -1136,7 +1131,7 @@ private[spark] class BlockManager(
11361131
val level = info.level
11371132
if (level.useMemory) { memoryStore.remove(id) }
11381133
if (level.useDisk) { diskStore.remove(id) }
1139-
if (level.useOffHeap) { tachyonStore.remove(id) }
1134+
if (level.useOffHeap) { externalBlockStore.remove(id) }
11401135
iterator.remove()
11411136
logInfo(s"Dropped block $id")
11421137
}
@@ -1216,8 +1211,8 @@ private[spark] class BlockManager(
12161211
blockInfo.clear()
12171212
memoryStore.clear()
12181213
diskStore.clear()
1219-
if (tachyonInitialized) {
1220-
tachyonStore.clear()
1214+
if (externalBlockStoreInitialized) {
1215+
externalBlockStore.clear()
12211216
}
12221217
metadataCleaner.cancel()
12231218
broadcastCleaner.cancel()

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@ class BlockManagerMaster(
5454
storageLevel: StorageLevel,
5555
memSize: Long,
5656
diskSize: Long,
57-
tachyonSize: Long): Boolean = {
57+
externalBlockStoreSize: Long): Boolean = {
5858
val res = driverEndpoint.askWithRetry[Boolean](
59-
UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize))
59+
UpdateBlockInfo(blockManagerId, blockId, storageLevel,
60+
memSize, diskSize, externalBlockStoreSize))
6061
logDebug(s"Updated info of block $blockId")
6162
res
6263
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
6060
context.reply(true)
6161

6262
case UpdateBlockInfo(
63-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
63+
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize) =>
6464
context.reply(updateBlockInfo(
65-
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize))
65+
blockManagerId, blockId, storageLevel, deserializedSize, size, externalBlockStoreSize))
6666

6767
case GetLocations(blockId) =>
6868
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
314314
storageLevel: StorageLevel,
315315
memSize: Long,
316316
diskSize: Long,
317-
tachyonSize: Long): Boolean = {
317+
externalBlockStoreSize: Long): Boolean = {
318318

319319
if (!blockManagerInfo.contains(blockManagerId)) {
320320
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
332332
}
333333

334334
blockManagerInfo(blockManagerId).updateBlockInfo(
335-
blockId, storageLevel, memSize, diskSize, tachyonSize)
335+
blockId, storageLevel, memSize, diskSize, externalBlockStoreSize)
336336

337337
var locations: mutable.HashSet[BlockManagerId] = null
338338
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
396396
storageLevel: StorageLevel,
397397
memSize: Long,
398398
diskSize: Long,
399-
tachyonSize: Long) {
400-
def isCached: Boolean = memSize + diskSize + tachyonSize > 0
399+
externalBlockStoreSize: Long) {
400+
def isCached: Boolean = memSize + diskSize + externalBlockStoreSize > 0
401401
}
402402

403403
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
429429
storageLevel: StorageLevel,
430430
memSize: Long,
431431
diskSize: Long,
432-
tachyonSize: Long) {
432+
externalBlockStoreSize: Long) {
433433

434434
updateLastSeenMs()
435435

@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
445445
}
446446

447447
if (storageLevel.isValid) {
448-
/* isValid means it is either stored in-memory, on-disk or on-Tachyon.
448+
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
449449
* The memSize here indicates the data size in or dropped from memory,
450-
* tachyonSize here indicates the data size in or dropped from Tachyon,
450+
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
451451
* and the diskSize here indicates the data size in or dropped to disk.
452452
* They can be both larger than 0, when a block is dropped from memory to disk.
453453
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
464464
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
465465
}
466466
if (storageLevel.useOffHeap) {
467-
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, tachyonSize))
468-
logInfo("Added %s on tachyon on %s (size: %s)".format(
469-
blockId, blockManagerId.hostPort, Utils.bytesToString(tachyonSize)))
467+
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
468+
logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
469+
blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
470470
}
471471
} else if (_blocks.containsKey(blockId)) {
472472
// If isValid is not true, drop the block.
@@ -482,8 +482,9 @@ private[spark] class BlockManagerInfo(
482482
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
483483
}
484484
if (blockStatus.storageLevel.useOffHeap) {
485-
logInfo("Removed %s on %s on tachyon (size: %s)".format(
486-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.tachyonSize)))
485+
logInfo("Removed %s on %s on externalBlockStore (size: %s)".format(
486+
blockId, blockManagerId.hostPort,
487+
Utils.bytesToString(blockStatus.externalBlockStoreSize)))
487488
}
488489
}
489490
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
6060
var storageLevel: StorageLevel,
6161
var memSize: Long,
6262
var diskSize: Long,
63-
var tachyonSize: Long)
63+
var externalBlockStoreSize: Long)
6464
extends ToBlockManagerMaster
6565
with Externalizable {
6666

@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
7272
storageLevel.writeExternal(out)
7373
out.writeLong(memSize)
7474
out.writeLong(diskSize)
75-
out.writeLong(tachyonSize)
75+
out.writeLong(externalBlockStoreSize)
7676
}
7777

7878
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
8181
storageLevel = StorageLevel(in)
8282
memSize = in.readLong()
8383
diskSize = in.readLong()
84-
tachyonSize = in.readLong()
84+
externalBlockStoreSize = in.readLong()
8585
}
8686
}
8787

0 commit comments

Comments
 (0)