Skip to content

Commit 6e121e0

Browse files
committed
resolve review comments and restructure blockmanasger code
1 parent a7aed6c commit 6e121e0

19 files changed

+204
-190
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
249249

250250
// Generate the random name for a temp folder in OffHeap
251251
// Add a timestamp as the suffix here to make it more safe
252-
val offHeapFolderName = "spark-" + randomUUID.toString()
253-
conf.set("spark.offHeapStore.folderName", offHeapFolderName)
252+
val extBlkFolderName = "spark-" + randomUUID.toString()
253+
@deprecated("Use extBlkFolderName instead.", "1.4.0")
254+
val tachyonFolderName = extBlkFolderName
255+
conf.set("spark.extBlkStore.folderName", extBlkFolderName)
254256

255257
def isLocal: Boolean = (master == "local" || master.startsWith("local["))
256258

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ object ExecutorExitCode {
3333
/** DiskStore failed to create a local temporary directory after many attempts. */
3434
val DISK_STORE_FAILED_TO_CREATE_DIR = 53
3535

36-
/** OffHeapStore failed to initialize after many attempts. */
37-
val OFFHEAP_STORE_FAILED_TO_INITIALIZE = 54
36+
/** ExtBlkStore failed to initialize after many attempts. */
37+
val ExtBlk_STORE_FAILED_TO_INITIALIZE = 54
3838

39-
/** OffHeapStore failed to create a local temporary directory after many attempts. */
40-
val OFFHEAP_STORE_FAILED_TO_CREATE_DIR = 55
39+
/** ExtBlkStore failed to create a local temporary directory after many attempts. */
40+
val ExtBlk_STORE_FAILED_TO_CREATE_DIR = 55
4141

4242
def explainExitCode(exitCode: Int): String = {
4343
exitCode match {
@@ -46,9 +46,11 @@ object ExecutorExitCode {
4646
case OOM => "OutOfMemoryError"
4747
case DISK_STORE_FAILED_TO_CREATE_DIR =>
4848
"Failed to create local directory (bad spark.local.dir?)"
49-
case OFFHEAP_STORE_FAILED_TO_INITIALIZE => "OffHeap Store failed to initialize."
50-
case OFFHEAP_STORE_FAILED_TO_CREATE_DIR =>
51-
"OffHeap Store failed to create a local temporary directory."
49+
// TODO: replace external block store with concreate implementation desc
50+
case ExtBlk_STORE_FAILED_TO_INITIALIZE => "External Block Store failed to initialize."
51+
// TODO: replace external block store with concreate implementation desc
52+
case ExtBlk_STORE_FAILED_TO_CREATE_DIR =>
53+
"External Block Store failed to create a local temporary directory."
5254
case _ =>
5355
"Unknown executor exit code (" + exitCode + ")" + (
5456
if (exitCode > 128) {

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,9 +1475,9 @@ abstract class RDD[T: ClassTag](
14751475

14761476
val persistence = if (storageLevel != StorageLevel.NONE) storageLevel.description else ""
14771477
val storageInfo = rdd.context.getRDDStorageInfo.filter(_.id == rdd.id).map(info =>
1478-
" CachedPartitions: %d; MemorySize: %s; OffHeap: %s; DiskSize: %s".format(
1478+
" CachedPartitions: %d; MemorySize: %s; ExtBlkStoreSize: %s; DiskSize: %s".format(
14791479
info.numCachedPartitions, bytesToString(info.memSize),
1480-
bytesToString(info.offHeapSize), bytesToString(info.diskSize)))
1480+
bytesToString(info.extBlkStoreSize), bytesToString(info.diskSize)))
14811481

14821482
s"$rdd [$persistence]" +: storageInfo
14831483
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,10 @@ private[spark] class BlockManager(
7878
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
7979

8080
// Actual storage of where blocks are kept
81-
private var offHeapInitialized = false
81+
private var extBlkStoreInitialized = false
8282
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
8383
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
84-
private[spark] lazy val offHeapStore: OffHeapStore = new OffHeapStore(this, executorId)
84+
private[spark] lazy val extBlkStore: ExtBlockStore = new ExtBlockStore(this, executorId)
8585

8686
private[spark]
8787
val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -367,10 +367,10 @@ private[spark] class BlockManager(
367367
if (info.tellMaster) {
368368
val storageLevel = status.storageLevel
369369
val inMemSize = Math.max(status.memSize, droppedMemorySize)
370-
val inOffHeapSize = status.offHeapSize
370+
val inExtBlkStoreSize = status.extBlkStoreSize
371371
val onDiskSize = status.diskSize
372372
master.updateBlockInfo(
373-
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inOffHeapSize)
373+
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inExtBlkStoreSize)
374374
} else {
375375
true
376376
}
@@ -388,15 +388,15 @@ private[spark] class BlockManager(
388388
BlockStatus(StorageLevel.NONE, 0L, 0L, 0L)
389389
case level =>
390390
val inMem = level.useMemory && memoryStore.contains(blockId)
391-
val inOffHeap = level.useOffHeap && offHeapStore.contains(blockId)
391+
val inExtBlkStore = level.useOffHeap && extBlkStore.contains(blockId)
392392
val onDisk = level.useDisk && diskStore.contains(blockId)
393393
val deserialized = if (inMem) level.deserialized else false
394-
val replication = if (inMem || inOffHeap || onDisk) level.replication else 1
395-
val storageLevel = StorageLevel(onDisk, inMem, inOffHeap, deserialized, replication)
394+
val replication = if (inMem || inExtBlkStore || onDisk) level.replication else 1
395+
val storageLevel = StorageLevel(onDisk, inMem, inExtBlkStore, deserialized, replication)
396396
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
397-
val offHeapSize = if (inOffHeap) offHeapStore.getSize(blockId) else 0L
397+
val extBlkStoreSize = if (inExtBlkStore) extBlkStore.getSize(blockId) else 0L
398398
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
399-
BlockStatus(storageLevel, memSize, diskSize, offHeapSize)
399+
BlockStatus(storageLevel, memSize, diskSize, extBlkStoreSize)
400400
}
401401
}
402402
}
@@ -476,11 +476,11 @@ private[spark] class BlockManager(
476476
}
477477
}
478478

479-
// Look for the block in offheap
479+
// Look for the block in external blk store
480480
if (level.useOffHeap) {
481-
logDebug(s"Getting block $blockId from offheap")
482-
if (offHeapStore.contains(blockId)) {
483-
offHeapStore.getBytes(blockId) match {
481+
logDebug(s"Getting block $blockId from ExtBlkStore")
482+
if (extBlkStore.contains(blockId)) {
483+
extBlkStore.getBytes(blockId) match {
484484
case Some(bytes) =>
485485
if (!asBlockResult) {
486486
return Some(bytes)
@@ -489,7 +489,7 @@ private[spark] class BlockManager(
489489
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
490490
}
491491
case None =>
492-
logDebug(s"Block $blockId not found in offHeap")
492+
logDebug(s"Block $blockId not found in extBlkStore")
493493
}
494494
}
495495
}
@@ -757,8 +757,8 @@ private[spark] class BlockManager(
757757
// We will drop it to disk later if the memory store can't hold it.
758758
(true, memoryStore)
759759
} else if (putLevel.useOffHeap) {
760-
// Use off-heap storage
761-
(false, offHeapStore)
760+
// Use external blk storage
761+
(false, extBlkStore)
762762
} else if (putLevel.useDisk) {
763763
// Don't get back the bytes from put unless we replicate them
764764
(putLevel.replication > 1, diskStore)
@@ -793,7 +793,7 @@ private[spark] class BlockManager(
793793

794794
val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
795795
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
796-
// Now that the block is in either the memory, offheap, or disk store,
796+
// Now that the block is in either the memory, extBlkStore, or disk store,
797797
// let other threads read it, and tell the master about it.
798798
marked = true
799799
putBlockInfo.markReady(size)
@@ -1090,10 +1090,11 @@ private[spark] class BlockManager(
10901090
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
10911091
val removedFromMemory = memoryStore.remove(blockId)
10921092
val removedFromDisk = diskStore.remove(blockId)
1093-
val removedFromOffHeap = if (offHeapInitialized) offHeapStore.remove(blockId) else false
1094-
if (!removedFromMemory && !removedFromDisk && !removedFromOffHeap) {
1093+
val removedFromExtBlkStore =
1094+
if (extBlkStoreInitialized) extBlkStore.remove(blockId) else false
1095+
if (!removedFromMemory && !removedFromDisk && !removedFromExtBlkStore) {
10951096
logWarning(s"Block $blockId could not be removed as it was not found in either " +
1096-
"the disk, memory, or offheap store")
1097+
"the disk, memory, or external blk store")
10971098
}
10981099
blockInfo.remove(blockId)
10991100
if (tellMaster && info.tellMaster) {
@@ -1127,7 +1128,7 @@ private[spark] class BlockManager(
11271128
val level = info.level
11281129
if (level.useMemory) { memoryStore.remove(id) }
11291130
if (level.useDisk) { diskStore.remove(id) }
1130-
if (level.useOffHeap) { offHeapStore.remove(id) }
1131+
if (level.useOffHeap) { extBlkStore.remove(id) }
11311132
iterator.remove()
11321133
logInfo(s"Dropped block $id")
11331134
}
@@ -1207,8 +1208,8 @@ private[spark] class BlockManager(
12071208
blockInfo.clear()
12081209
memoryStore.clear()
12091210
diskStore.clear()
1210-
if (offHeapInitialized) {
1211-
offHeapStore.clear()
1211+
if (extBlkStoreInitialized) {
1212+
extBlkStore.clear()
12121213
}
12131214
metadataCleaner.cancel()
12141215
broadcastCleaner.cancel()

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ class BlockManagerMasterEndpoint(
6060
context.reply(true)
6161

6262
case UpdateBlockInfo(
63-
blockManagerId, blockId, storageLevel, deserializedSize, size, offHeapSize) =>
63+
blockManagerId, blockId, storageLevel, deserializedSize, size, extBlkStoreSize) =>
6464
context.reply(updateBlockInfo(
65-
blockManagerId, blockId, storageLevel, deserializedSize, size, offHeapSize))
65+
blockManagerId, blockId, storageLevel, deserializedSize, size, extBlkStoreSize))
6666

6767
case GetLocations(blockId) =>
6868
context.reply(getLocations(blockId))
@@ -314,7 +314,7 @@ class BlockManagerMasterEndpoint(
314314
storageLevel: StorageLevel,
315315
memSize: Long,
316316
diskSize: Long,
317-
offHeapSize: Long): Boolean = {
317+
extBlkStoreSize: Long): Boolean = {
318318

319319
if (!blockManagerInfo.contains(blockManagerId)) {
320320
if (blockManagerId.isDriver && !isLocal) {
@@ -332,7 +332,7 @@ class BlockManagerMasterEndpoint(
332332
}
333333

334334
blockManagerInfo(blockManagerId).updateBlockInfo(
335-
blockId, storageLevel, memSize, diskSize, offHeapSize)
335+
blockId, storageLevel, memSize, diskSize, extBlkStoreSize)
336336

337337
var locations: mutable.HashSet[BlockManagerId] = null
338338
if (blockLocations.containsKey(blockId)) {
@@ -396,8 +396,8 @@ case class BlockStatus(
396396
storageLevel: StorageLevel,
397397
memSize: Long,
398398
diskSize: Long,
399-
offHeapSize: Long) {
400-
def isCached: Boolean = memSize + diskSize + offHeapSize > 0
399+
extBlkStoreSize: Long) {
400+
def isCached: Boolean = memSize + diskSize + extBlkStoreSize > 0
401401
}
402402

403403
@DeveloperApi
@@ -429,7 +429,7 @@ private[spark] class BlockManagerInfo(
429429
storageLevel: StorageLevel,
430430
memSize: Long,
431431
diskSize: Long,
432-
offHeapSize: Long) {
432+
extBlkStoreSize: Long) {
433433

434434
updateLastSeenMs()
435435

@@ -445,9 +445,9 @@ private[spark] class BlockManagerInfo(
445445
}
446446

447447
if (storageLevel.isValid) {
448-
/* isValid means it is either stored in-memory, on-disk or on-OffHeap.
448+
/* isValid means it is either stored in-memory, on-disk or on-extBlkStore.
449449
* The memSize here indicates the data size in or dropped from memory,
450-
* offHeapSize here indicates the data size in or dropped from offHeap,
450+
* extBlkStoreSize here indicates the data size in or dropped from extBlkStore,
451451
* and the diskSize here indicates the data size in or dropped to disk.
452452
* They can be both larger than 0, when a block is dropped from memory to disk.
453453
* Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
@@ -464,9 +464,9 @@ private[spark] class BlockManagerInfo(
464464
blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
465465
}
466466
if (storageLevel.useOffHeap) {
467-
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, offHeapSize))
468-
logInfo("Added %s on offHeap on %s (size: %s)".format(
469-
blockId, blockManagerId.hostPort, Utils.bytesToString(offHeapSize)))
467+
_blocks.put(blockId, BlockStatus(storageLevel, 0, 0, extBlkStoreSize))
468+
logInfo("Added %s on ExtBlkStore on %s (size: %s)".format(
469+
blockId, blockManagerId.hostPort, Utils.bytesToString(extBlkStoreSize)))
470470
}
471471
} else if (_blocks.containsKey(blockId)) {
472472
// If isValid is not true, drop the block.
@@ -482,8 +482,8 @@ private[spark] class BlockManagerInfo(
482482
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.diskSize)))
483483
}
484484
if (blockStatus.storageLevel.useOffHeap) {
485-
logInfo("Removed %s on %s on offheap (size: %s)".format(
486-
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.offHeapSize)))
485+
logInfo("Removed %s on %s on extBlkStore (size: %s)".format(
486+
blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.extBlkStoreSize)))
487487
}
488488
}
489489
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ private[spark] object BlockManagerMessages {
6060
var storageLevel: StorageLevel,
6161
var memSize: Long,
6262
var diskSize: Long,
63-
var offHeapSize: Long)
63+
var extBlkStoreSize: Long)
6464
extends ToBlockManagerMaster
6565
with Externalizable {
6666

@@ -72,7 +72,7 @@ private[spark] object BlockManagerMessages {
7272
storageLevel.writeExternal(out)
7373
out.writeLong(memSize)
7474
out.writeLong(diskSize)
75-
out.writeLong(offHeapSize)
75+
out.writeLong(extBlkStoreSize)
7676
}
7777

7878
override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
@@ -81,7 +81,7 @@ private[spark] object BlockManagerMessages {
8181
storageLevel = StorageLevel(in)
8282
memSize = in.readLong()
8383
diskSize = in.readLong()
84-
offHeapSize = in.readLong()
84+
extBlkStoreSize = in.readLong()
8585
}
8686
}
8787

core/src/main/scala/org/apache/spark/storage/OffHeapBlockManager.scala renamed to core/src/main/scala/org/apache/spark/storage/ExtBlockManager.scala

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ import org.apache.spark.Logging
2323
import scala.util.control.NonFatal
2424

2525

26-
trait OffHeapBlockManager {
26+
private[spark] abstract class ExtBlockManager {
2727

2828
/**
2929
* desc for the implementation.
3030
*
3131
*/
32-
def desc(): String = {"OffHeap"}
32+
def desc(): String = {"External Block Store"}
3333

3434
/**
3535
* initialize a concrete block manager implementation.
@@ -39,7 +39,7 @@ trait OffHeapBlockManager {
3939
def init(blockManager: BlockManager, executorId: String): Unit
4040

4141
/**
42-
* remove the cache from offheap
42+
* remove the cache from ExtBlkStore
4343
*
4444
* @throws java.io.IOException when FS failure in removing file.
4545
*/
@@ -53,14 +53,14 @@ trait OffHeapBlockManager {
5353
def fileExists(blockId: BlockId): Boolean
5454

5555
/**
56-
* save the cache to the offheap.
56+
* save the cache to the ExtBlkStore.
5757
*
5858
* @throws java.io.IOException when FS failure in put blocks.
5959
*/
6060
def putBytes(blockId: BlockId, bytes: ByteBuffer)
6161

6262
/**
63-
* retrieve the cache from offheap
63+
* retrieve the cache from ExtBlkStore
6464
*
6565
* @throws java.io.IOException when FS failure in get blocks.
6666
*/
@@ -78,32 +78,4 @@ trait OffHeapBlockManager {
7878
*
7979
*/
8080
def addShutdownHook()
81-
82-
final def setup(blockManager: BlockManager, executorId: String): Unit = {
83-
init(blockManager, executorId)
84-
addShutdownHook()
85-
}
86-
}
87-
88-
object OffHeapBlockManager extends Logging{
89-
val MAX_DIR_CREATION_ATTEMPTS = 10
90-
val SUB_DIRS_PER_DIR = "64"
91-
def create(blockManager: BlockManager, executorId: String): Option[OffHeapBlockManager] = {
92-
val sNames = blockManager.conf.getOption("spark.offHeapStore.blockManager")
93-
sNames match {
94-
case Some(name) =>
95-
try {
96-
val instance = Class.forName(name)
97-
.newInstance()
98-
.asInstanceOf[OffHeapBlockManager]
99-
instance.setup(blockManager, executorId)
100-
Some(instance)
101-
} catch {
102-
case NonFatal(t) =>
103-
logError("Cannot initialize offHeap store")
104-
None
105-
}
106-
case None => None
107-
}
108-
}
10981
}

0 commit comments

Comments
 (0)